Cache received changes, ignore snapshot cache for durable changes, revert StoreAction, history fix, indices fix

This commit is contained in:
Marcel Mraz 2025-01-21 11:34:42 +01:00
parent 310a9ae4e0
commit 7e0f5b6369
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
50 changed files with 437 additions and 338 deletions

View file

@ -10,10 +10,10 @@ import {
type MetadataRepository,
type DeltasRepository,
} from "./queue";
import { SnapshotAction, StoreDelta } from "../store";
import { StoreAction, StoreDelta } from "../store";
import type { StoreChange } from "../store";
import type { ExcalidrawImperativeAPI } from "../types";
import type { SceneElementsMap } from "../element/types";
import type { ExcalidrawElement, SceneElementsMap } from "../element/types";
import type { CLIENT_MESSAGE_RAW, SERVER_DELTA, CHANGE } from "./protocol";
import { debounce } from "../utils";
import { randomId } from "../random";
@ -38,7 +38,7 @@ class SocketClient {
private isOffline = true;
private socket: ReconnectingWebSocket | null = null;
private get isDisconnected() {
public get isDisconnected() {
return !this.socket;
}
@ -204,6 +204,11 @@ export class SyncClient {
private readonly metadata: MetadataRepository;
private readonly client: SocketClient;
private relayedElementsVersionsCache = new Map<
string,
ExcalidrawElement["version"]
>();
// #region ACKNOWLEDGED DELTAS & METADATA
// CFDO: shouldn't be stateful, only request / response
private readonly acknowledgedDeltasMap: Map<string, AcknowledgedDelta> =
@ -264,11 +269,12 @@ export class SyncClient {
// #region PUBLIC API METHODS
public connect() {
return this.client.connect();
this.client.connect();
}
public disconnect() {
return this.client.disconnect();
this.client.disconnect();
this.relayedElementsVersionsCache.clear();
}
public pull(sinceVersion?: number): void {
@ -298,6 +304,32 @@ export class SyncClient {
// CFDO: should be throttled! 60 fps for live scenes, 10s or so for single player
public relay(change: StoreChange): void {
if (this.client.isDisconnected) {
// don't reconnect if we're explicitly disconnected
// otherwise versioning slider would trigger sync on every slider step
return;
}
let shouldRelay = false;
for (const [id, element] of Object.entries(change.elements)) {
const cachedElementVersion = this.relayedElementsVersionsCache.get(id);
if (!cachedElementVersion || cachedElementVersion < element.version) {
this.relayedElementsVersionsCache.set(id, element.version);
if (!shouldRelay) {
// it's enough that a single element is not cached or is outdated in cache
// to relay the whole change, otherwise we skip the relay as we've already received this change
shouldRelay = true;
}
}
}
if (!shouldRelay) {
return;
}
this.client.send({
type: "relay",
payload: { ...change },
@ -357,12 +389,13 @@ export class SyncClient {
existingElement.version < relayedElement.version // updated element
) {
nextElements.set(id, relayedElement);
this.relayedElementsVersionsCache.set(id, relayedElement.version);
}
}
this.api.updateScene({
elements: Array.from(nextElements.values()),
snapshotAction: SnapshotAction.UPDATE,
storeAction: StoreAction.UPDATE,
});
} catch (e) {
console.error("Failed to apply relayed change:", e);
@ -426,16 +459,22 @@ export class SyncClient {
delta,
nextElements,
appState,
{
triggerIncrement: false,
updateSnapshot: true,
},
);
prevSnapshot = this.api.store.snapshot;
}
// CFDO: I still need to filter out uncomitted elements
// I still need to update snapshot with the new elements
// CFDO: might need to restore first due to potentially stale delta versions
this.api.updateScene({
elements: Array.from(nextElements.values()),
snapshotAction: SnapshotAction.NONE,
// even though the snapshot should be up-to-date already,
// still some more updates might be triggered,
// i.e. as a result from syncing invalid indices
storeAction: StoreAction.UPDATE,
});
this.lastAcknowledgedVersion = nextAcknowledgedVersion;