Syncing ephemeral element updates

This commit is contained in:
Marcel Mraz 2025-01-20 15:07:37 +01:00
parent c57249481e
commit 310a9ae4e0
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
60 changed files with 1104 additions and 906 deletions

View file

@ -6,18 +6,15 @@ import ReconnectingWebSocket, {
} from "reconnecting-websocket";
import { Utils } from "./utils";
import {
SyncQueue,
LocalDeltasQueue,
type MetadataRepository,
type IncrementsRepository,
type DeltasRepository,
} from "./queue";
import { StoreIncrement } from "../store";
import { SnapshotAction, StoreDelta } from "../store";
import type { StoreChange } from "../store";
import type { ExcalidrawImperativeAPI } from "../types";
import type { SceneElementsMap } from "../element/types";
import type {
CLIENT_INCREMENT,
CLIENT_MESSAGE_RAW,
SERVER_INCREMENT,
} from "./protocol";
import type { CLIENT_MESSAGE_RAW, SERVER_DELTA, CHANGE } from "./protocol";
import { debounce } from "../utils";
import { randomId } from "../random";
@ -38,12 +35,6 @@ class SocketClient {
// thus working with a slighter smaller limit of 800 kB (leaving 224kB for metadata)
private static readonly MAX_MESSAGE_SIZE = 800_000;
private static readonly NORMAL_CLOSURE_CODE = 1000;
// Chrome throws "Uncaught InvalidAccessError" with message:
// "The close code must be either 1000, or between 3000 and 4999. 1009 is neither."
// therefore using custom codes instead.
private static readonly MESSAGE_IS_TOO_LARGE_ERROR_CODE = 3009;
private isOffline = true;
private socket: ReconnectingWebSocket | null = null;
@ -129,11 +120,11 @@ class SocketClient {
public send(message: {
type: "relay" | "pull" | "push";
payload: any;
payload: Record<string, unknown>;
}): void {
if (this.isOffline) {
// connection opened, don't let the WS buffer the messages,
// as we do explicitly buffer unacknowledged increments
// as we do explicitly buffer unacknowledged deltas
return;
}
@ -145,6 +136,7 @@ class SocketClient {
const { type, payload } = message;
// CFDO II: could be slowish for large payloads, thing about a better solution (i.e. msgpack 10x faster, 2x smaller)
const stringifiedPayload = JSON.stringify(payload);
const payloadSize = new TextEncoder().encode(stringifiedPayload).byteLength;
@ -193,8 +185,8 @@ class SocketClient {
};
}
interface AcknowledgedIncrement {
increment: StoreIncrement;
interface AcknowledgedDelta {
delta: StoreDelta;
version: number;
}
@ -208,21 +200,19 @@ export class SyncClient {
: "test_room_prod";
private readonly api: ExcalidrawImperativeAPI;
private readonly queue: SyncQueue;
private readonly localDeltas: LocalDeltasQueue;
private readonly metadata: MetadataRepository;
private readonly client: SocketClient;
// #region ACKNOWLEDGED INCREMENTS & METADATA
// #region ACKNOWLEDGED DELTAS & METADATA
// CFDO: shouldn't be stateful, only request / response
private readonly acknowledgedIncrementsMap: Map<
string,
AcknowledgedIncrement
> = new Map();
private readonly acknowledgedDeltasMap: Map<string, AcknowledgedDelta> =
new Map();
public get acknowledgedIncrements() {
return Array.from(this.acknowledgedIncrementsMap.values())
public get acknowledgedDeltas() {
return Array.from(this.acknowledgedDeltasMap.values())
.sort((a, b) => (a.version < b.version ? -1 : 1))
.map((x) => x.increment);
.map((x) => x.delta);
}
private _lastAcknowledgedVersion = 0;
@ -240,12 +230,12 @@ export class SyncClient {
private constructor(
api: ExcalidrawImperativeAPI,
repository: MetadataRepository,
queue: SyncQueue,
queue: LocalDeltasQueue,
options: { host: string; roomId: string; lastAcknowledgedVersion: number },
) {
this.api = api;
this.metadata = repository;
this.queue = queue;
this.localDeltas = queue;
this.lastAcknowledgedVersion = options.lastAcknowledgedVersion;
this.client = new SocketClient(options.host, options.roomId, {
onOpen: this.onOpen,
@ -257,16 +247,16 @@ export class SyncClient {
// #region SYNC_CLIENT FACTORY
public static async create(
api: ExcalidrawImperativeAPI,
repository: IncrementsRepository & MetadataRepository,
repository: DeltasRepository & MetadataRepository,
) {
const queue = await SyncQueue.create(repository);
const queue = await LocalDeltasQueue.create(repository);
// CFDO: temporary for custom roomId (though E+ will be similar)
const roomId = window.location.pathname.split("/").at(-1);
return new SyncClient(api, repository, queue, {
host: SyncClient.HOST_URL,
roomId: roomId ?? SyncClient.ROOM_ID,
// CFDO: temporary, so that all increments are loaded and applied on init
// CFDO: temporary, so that all deltas are loaded and applied on init
lastAcknowledgedVersion: 0,
});
}
@ -290,26 +280,27 @@ export class SyncClient {
});
}
public push(increment?: StoreIncrement): void {
if (increment) {
this.queue.add(increment);
public push(delta?: StoreDelta): void {
if (delta) {
this.localDeltas.add(delta);
}
// re-send all already queued increments
for (const queuedIncrement of this.queue.getAll()) {
// re-send all already queued deltas
for (const delta of this.localDeltas.getAll()) {
this.client.send({
type: "push",
payload: {
...queuedIncrement,
...delta,
},
});
}
}
public relay(buffer: ArrayBuffer): void {
// CFDO: should be throttled! 60 fps for live scenes, 10s or so for single player
public relay(change: StoreChange): void {
this.client.send({
type: "relay",
payload: { buffer },
payload: { ...change },
});
}
// #endregion
@ -349,76 +340,110 @@ export class SyncClient {
}
};
// CFDO: refactor by applying all operations to store, not to the elements
private handleAcknowledged = (payload: {
increments: Array<SERVER_INCREMENT>;
}) => {
let nextAcknowledgedVersion = this.lastAcknowledgedVersion;
let elements = new Map(
// CFDO: retrieve the map already
private handleRelayed = (payload: CHANGE) => {
// CFDO: retrieve the map already
const nextElements = new Map(
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
) as SceneElementsMap;
try {
const { increments: remoteIncrements } = payload;
const { elements: relayedElements } = payload;
// apply remote increments
for (const { id, version, payload } of remoteIncrements) {
// CFDO: temporary to load all increments on init
this.acknowledgedIncrementsMap.set(id, {
increment: StoreIncrement.load(payload),
for (const [id, relayedElement] of Object.entries(relayedElements)) {
const existingElement = nextElements.get(id);
if (
!existingElement || // new element
existingElement.version < relayedElement.version // updated element
) {
nextElements.set(id, relayedElement);
}
}
this.api.updateScene({
elements: Array.from(nextElements.values()),
snapshotAction: SnapshotAction.UPDATE,
});
} catch (e) {
console.error("Failed to apply relayed change:", e);
}
};
// CFDO: refactor by applying all operations to store, not to the elements
private handleAcknowledged = (payload: { deltas: Array<SERVER_DELTA> }) => {
let prevSnapshot = this.api.store.snapshot;
try {
const remoteDeltas = Array.from(payload.deltas);
const applicableDeltas: Array<StoreDelta> = [];
const appState = this.api.getAppState();
let nextAcknowledgedVersion = this.lastAcknowledgedVersion;
let nextElements = new Map(
// CFDO: retrieve the map already
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
) as SceneElementsMap;
for (const { id, version, payload } of remoteDeltas) {
// CFDO: temporary to load all deltas on init
this.acknowledgedDeltasMap.set(id, {
delta: StoreDelta.load(payload),
version,
});
// we've already applied this increment
// we've already applied this delta!
if (version <= nextAcknowledgedVersion) {
continue;
}
if (version === nextAcknowledgedVersion + 1) {
nextAcknowledgedVersion = version;
} else {
// it's fine to apply increments our of order,
// as they are idempontent, so that we can re-apply them again,
// as long as we don't mark their version as acknowledged
console.debug(
`Received out of order increment, expected "${
// CFDO:strictly checking for out of order deltas; might be relaxed if it becomes a problem
if (version !== nextAcknowledgedVersion + 1) {
throw new Error(
`Received out of order delta, expected "${
nextAcknowledgedVersion + 1
}", but received "${version}"`,
);
}
// local increment shall not have to be applied again
if (this.queue.has(id)) {
this.queue.remove(id);
if (this.localDeltas.has(id)) {
// local delta does not have to be applied again
this.localDeltas.remove(id);
} else {
// apply remote increment with higher version than the last acknowledged one
const remoteIncrement = StoreIncrement.load(payload);
[elements] = remoteIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
// this is a new remote delta, adding it to the list of applicable deltas
const remoteDelta = StoreDelta.load(payload);
applicableDeltas.push(remoteDelta);
}
// apply local increments
for (const localIncrement of this.queue.getAll()) {
// CFDO: in theory only necessary when remote increments modified same element properties!
[elements] = localIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
this.api.updateScene({
elements: Array.from(elements.values()),
storeAction: "update",
});
nextAcknowledgedVersion = version;
}
// adding all yet unacknowledged local deltas
const localDeltas = this.localDeltas.getAll();
applicableDeltas.push(...localDeltas);
for (const delta of applicableDeltas) {
[nextElements] = this.api.store.applyDeltaTo(
delta,
nextElements,
appState,
);
prevSnapshot = this.api.store.snapshot;
}
// CFDO: I still need to filter out uncomitted elements
// I still need to update snapshot with the new elements
this.api.updateScene({
elements: Array.from(nextElements.values()),
snapshotAction: SnapshotAction.NONE,
});
this.lastAcknowledgedVersion = nextAcknowledgedVersion;
} catch (e) {
console.error("Failed to apply acknowledged increments:", e);
// CFDO: might just be on error
console.error("Failed to apply acknowledged deltas:", e);
// rollback to the previous snapshot, so that we don't end up in an incosistent state
this.api.store.snapshot = prevSnapshot;
// schedule another fresh pull in case of a failure
this.schedulePull();
}
};
@ -427,17 +452,10 @@ export class SyncClient {
ids: Array<string>;
message: string;
}) => {
// handle rejected increments
// handle rejected deltas
console.error("Rejected message received:", payload);
};
private handleRelayed = (payload: {
increments: Array<CLIENT_INCREMENT>;
}) => {
// apply relayed increments / buffer
console.log("Relayed message received:", payload);
};
private schedulePull = debounce(() => this.pull(), 1000);
// #endregion
}