From f12ed8e0b2fe6bdfdf81b5c019b655d25a089b23 Mon Sep 17 00:00:00 2001 From: Marcel Mraz Date: Tue, 26 Nov 2024 22:51:19 +0100 Subject: [PATCH] WIP sync client --- excalidraw-app/App.tsx | 12 +- excalidraw-app/collab/Collab.tsx | 6 + packages/excalidraw/.gitignore | 1 + packages/excalidraw/change.ts | 97 ++++--- packages/excalidraw/cloudflare/changes.ts | 2 +- packages/excalidraw/components/App.tsx | 1 + packages/excalidraw/package.json | 6 +- packages/excalidraw/sync/client.ts | 315 ++++++++++++++++------ packages/excalidraw/sync/protocol.ts | 13 +- packages/excalidraw/sync/server.ts | 3 +- packages/excalidraw/types.ts | 1 + yarn.lock | 16 +- 12 files changed, 327 insertions(+), 146 deletions(-) diff --git a/excalidraw-app/App.tsx b/excalidraw-app/App.tsx index f731cae96c..3165a069b5 100644 --- a/excalidraw-app/App.tsx +++ b/excalidraw-app/App.tsx @@ -54,6 +54,7 @@ import Collab, { collabAPIAtom, isCollaboratingAtom, isOfflineAtom, + syncAPIAtom, } from "./collab/Collab"; import { exportToBackend, @@ -363,11 +364,20 @@ const ExcalidrawWrapper = () => { const [, setShareDialogState] = useAtom(shareDialogStateAtom); const [collabAPI] = useAtom(collabAPIAtom); + const [syncAPI] = useAtom(syncAPIAtom); const [isCollaborating] = useAtomWithInitialValue(isCollaboratingAtom, () => { return isCollaborationLink(window.location.href); }); const collabError = useAtomValue(collabErrorIndicatorAtom); + useEffect(() => { + syncAPI?.reconnect(); + + return () => { + syncAPI?.disconnect(); + }; + }, [syncAPI]); + useHandleLibrary({ excalidrawAPI, adapter: LibraryIndexedDBAdapter, @@ -671,7 +681,7 @@ const ExcalidrawWrapper = () => { // some appState like selections should also be transfered (we could even persist it) if (!elementsChange.isEmpty()) { - console.log(elementsChange); + syncAPI?.push("durable", [elementsChange]); } }; diff --git a/excalidraw-app/collab/Collab.tsx b/excalidraw-app/collab/Collab.tsx index 1097aeda9e..60c9b09fba 100644 --- a/excalidraw-app/collab/Collab.tsx +++ b/excalidraw-app/collab/Collab.tsx @@ -88,7 +88,9 @@ import type { ReconciledExcalidrawElement, RemoteExcalidrawElement, } from "../../packages/excalidraw/data/reconcile"; +import { ExcalidrawSyncClient } from "../../packages/excalidraw/sync/client"; +export const syncAPIAtom = atom(null); export const collabAPIAtom = atom(null); export const isCollaboratingAtom = atom(false); export const isOfflineAtom = atom(false); @@ -234,6 +236,10 @@ class Collab extends PureComponent { }; appJotaiStore.set(collabAPIAtom, collabAPI); + appJotaiStore.set( + syncAPIAtom, + new ExcalidrawSyncClient(this.excalidrawAPI), + ); if (import.meta.env.MODE === ENV.TEST || import.meta.env.DEV) { window.collab = window.collab || ({} as Window["collab"]); diff --git a/packages/excalidraw/.gitignore b/packages/excalidraw/.gitignore index 971fcb7d34..176ebdb8d4 100644 --- a/packages/excalidraw/.gitignore +++ b/packages/excalidraw/.gitignore @@ -1,2 +1,3 @@ node_modules types +.wrangler diff --git a/packages/excalidraw/change.ts b/packages/excalidraw/change.ts index bfd0a262ab..2ab3fd1ff3 100644 --- a/packages/excalidraw/change.ts +++ b/packages/excalidraw/change.ts @@ -32,6 +32,7 @@ import type { } from "./element/types"; import { orderByFractionalIndex, syncMovedIndices } from "./fractionalIndex"; import { getNonDeletedGroupIds } from "./groups"; +import { randomId } from "./random"; import { getObservedAppState } from "./store"; import type { AppState, @@ -795,27 +796,33 @@ export class AppStateChange implements Change { } } -type ElementPartial = Omit< - ElementUpdate>, - "seed" ->; +type ElementPartial = + ElementUpdate>; /** * Elements change is a low level primitive to capture a change between two sets of elements. * It does so by encapsulating forward and backward `Delta`s, allowing to time-travel in both directions. */ export class ElementsChange implements Change { + public readonly id: string; + private constructor( private readonly added: Record>, private readonly removed: Record>, private readonly updated: Record>, - ) {} + options: { changeId: string }, + ) { + this.id = options.changeId; + } public static create( added: Record>, removed: Record>, updated: Record>, - options = { shouldRedistribute: false }, + options: { changeId: string; shouldRedistribute: boolean } = { + changeId: randomId(), + shouldRedistribute: false, + }, ) { let change: ElementsChange; @@ -840,9 +847,13 @@ export class ElementsChange implements Change { } } - change = new ElementsChange(nextAdded, nextRemoved, nextUpdated); + change = new ElementsChange(nextAdded, nextRemoved, nextUpdated, { + changeId: options.changeId, + }); } else { - change = new ElementsChange(added, removed, updated); + change = new ElementsChange(added, removed, updated, { + changeId: options.changeId, + }); } if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) { @@ -985,12 +996,13 @@ export class ElementsChange implements Change { return ElementsChange.create({}, {}, {}); } - public static load(data: { - added: Record>; - removed: Record>; - updated: Record>; - }) { - return ElementsChange.create(data.added, data.removed, data.updated); + public static load(payload: string) { + const { id, added, removed, updated } = JSON.parse(payload); + + return ElementsChange.create(added, removed, updated, { + changeId: id, + shouldRedistribute: false, + }); } public inverse(): ElementsChange { @@ -1077,6 +1089,7 @@ export class ElementsChange implements Change { const updated = applyLatestChangesInternal(this.updated); return ElementsChange.create(added, removed, updated, { + changeId: this.id, shouldRedistribute: true, // redistribute the deltas as `isDeleted` could have been updated }); } @@ -1101,9 +1114,9 @@ export class ElementsChange implements Change { flags, ); - const addedElements = applyDeltas(this.added); - const removedElements = applyDeltas(this.removed); - const updatedElements = applyDeltas(this.updated); + const addedElements = applyDeltas("added", this.added); + const removedElements = applyDeltas("removed", this.removed); + const updatedElements = applyDeltas("updated", this.updated); const affectedElements = this.resolveConflicts(elements, nextElements); @@ -1156,22 +1169,27 @@ export class ElementsChange implements Change { } } - private static createApplier = ( - nextElements: SceneElementsMap, - snapshot: Map, - flags: { - containsVisibleDifference: boolean; - containsZindexDifference: boolean; - }, - ) => { - const getElement = ElementsChange.createGetter( - nextElements, - snapshot, - flags, - ); + private static createApplier = + ( + nextElements: SceneElementsMap, + snapshot: Map, + flags: { + containsVisibleDifference: boolean; + containsZindexDifference: boolean; + }, + ) => + ( + type: "added" | "removed" | "updated", + deltas: Record>, + ) => { + const getElement = ElementsChange.createGetter( + type, + nextElements, + snapshot, + flags, + ); - return (deltas: Record>) => - Object.entries(deltas).reduce((acc, [id, delta]) => { + return Object.entries(deltas).reduce((acc, [id, delta]) => { const element = getElement(id, delta.inserted); if (element) { @@ -1182,10 +1200,11 @@ export class ElementsChange implements Change { return acc; }, new Map()); - }; + }; private static createGetter = ( + type: "added" | "removed" | "updated", elements: SceneElementsMap, snapshot: Map, flags: { @@ -1211,6 +1230,15 @@ export class ElementsChange implements Change { ) { flags.containsVisibleDifference = true; } + } else if (type === "added") { + // for additions the element does not have to exist (i.e. remote update) + // TODO: the version itself might be different! + element = newElementWith( + { id, version: 1 } as OrderedExcalidrawElement, + { + ...partial, + }, + ); } } @@ -1574,8 +1602,7 @@ export class ElementsChange implements Change { private static stripIrrelevantProps( partial: Partial, ): ElementPartial { - const { id, updated, version, versionNonce, seed, ...strippedPartial } = - partial; + const { id, updated, version, versionNonce, ...strippedPartial } = partial; return strippedPartial; } diff --git a/packages/excalidraw/cloudflare/changes.ts b/packages/excalidraw/cloudflare/changes.ts index c7e1bcc00c..7f15d40f5f 100644 --- a/packages/excalidraw/cloudflare/changes.ts +++ b/packages/excalidraw/cloudflare/changes.ts @@ -8,7 +8,7 @@ import type { export class DurableChangesRepository implements ChangesRepository { constructor(private storage: DurableObjectStorage) { // #region DEV ONLY - this.storage.sql.exec(`DROP TABLE IF EXISTS changes;`); + // this.storage.sql.exec(`DROP TABLE IF EXISTS changes;`); // #endregion this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS changes( diff --git a/packages/excalidraw/components/App.tsx b/packages/excalidraw/components/App.tsx index debc84f502..248e99eaa7 100644 --- a/packages/excalidraw/components/App.tsx +++ b/packages/excalidraw/components/App.tsx @@ -719,6 +719,7 @@ class App extends React.Component { addFiles: this.addFiles, resetScene: this.resetScene, getSceneElementsIncludingDeleted: this.getSceneElementsIncludingDeleted, + store: this.store, history: { clear: this.resetHistory, }, diff --git a/packages/excalidraw/package.json b/packages/excalidraw/package.json index 174ff44c4d..cd9ed934a6 100644 --- a/packages/excalidraw/package.json +++ b/packages/excalidraw/package.json @@ -140,8 +140,8 @@ "start": "node ../../scripts/buildExample.mjs && vite", "build:example": "node ../../scripts/buildExample.mjs", "size": "yarn build:umd && size-limit", - "cf:deploy": "wrangler deploy", - "cf:dev": "wrangler dev", - "cf:typegen": "wrangler types" + "sync:deploy": "wrangler deploy", + "sync:dev": "wrangler dev", + "sync:typegen": "wrangler types" } } diff --git a/packages/excalidraw/sync/client.ts b/packages/excalidraw/sync/client.ts index 326f6b9e01..eecfbde554 100644 --- a/packages/excalidraw/sync/client.ts +++ b/packages/excalidraw/sync/client.ts @@ -1,44 +1,147 @@ +/* eslint-disable no-console */ import { Utils } from "./utils"; -import type { CLIENT_CHANGE, SERVER_CHANGE } from "./protocol"; +import { ElementsChange } from "../change"; +import type { ExcalidrawImperativeAPI } from "../types"; +import type { SceneElementsMap } from "../element/types"; +import type { CLIENT_CHANGE, PUSH_PAYLOAD, SERVER_CHANGE } from "./protocol"; +import throttle from "lodash.throttle"; -class ExcalidrawSyncClient { +export class ExcalidrawSyncClient { // TODO: add prod url private static readonly HOST_URL = "ws://localhost:8787"; + private static readonly RECONNECT_INTERVAL = 10_000; - private roomId: string; - private lastAcknowledgedVersion: number; + private lastAcknowledgedVersion = 0; + + private readonly api: ExcalidrawImperativeAPI; + private readonly roomId: string; + private readonly queuedChanges: Map = new Map(); + private get localChanges() { + return Array.from(this.queuedChanges.values()); + } private server: WebSocket | null = null; + private get isConnected() { + return this.server?.readyState === WebSocket.OPEN; + } - constructor(roomId: string = "test_room_1") { + private isConnecting: { done: (error?: Error) => void } | null = null; + + constructor(api: ExcalidrawImperativeAPI, roomId: string = "test_room_1") { + this.api = api; this.roomId = roomId; // TODO: persist in idb this.lastAcknowledgedVersion = 0; } - public connect() { - this.server = new WebSocket( - `${ExcalidrawSyncClient.HOST_URL}/connect?roomId=${this.roomId}`, + public reconnect = throttle( + async () => { + try { + if (this.isConnected) { + console.debug("Already connected to the sync server."); + return; + } + + if (this.isConnecting !== null) { + console.debug("Already reconnecting to the sync server..."); + return; + } + + console.trace("Reconnecting to the sync server..."); + + const isConnecting = { + done: () => {}, + }; + + // ensure there won't be multiple reconnection attempts + this.isConnecting = isConnecting; + + return await new Promise((resolve, reject) => { + this.server = new WebSocket( + `${ExcalidrawSyncClient.HOST_URL}/connect?roomId=${this.roomId}`, + ); + + // wait for 10 seconds before timing out + const timeoutId = setTimeout(() => { + reject("Connecting the sync server timed out"); + }, 10_000); + + // resolved when opened, rejected on error + isConnecting.done = (error?: Error) => { + clearTimeout(timeoutId); + + if (error) { + reject(error); + } else { + resolve(); + } + }; + + this.server.addEventListener("message", this.onMessage); + this.server.addEventListener("close", this.onClose); + this.server.addEventListener("error", this.onError); + this.server.addEventListener("open", this.onOpen); + }); + } catch (e) { + console.error("Failed to connect to sync server:", e); + this.disconnect(e as Error); + } + }, + ExcalidrawSyncClient.RECONNECT_INTERVAL, + { leading: true }, + ); + + public disconnect = throttle( + (error?: Error) => { + try { + this.server?.removeEventListener("message", this.onMessage); + this.server?.removeEventListener("close", this.onClose); + this.server?.removeEventListener("error", this.onError); + this.server?.removeEventListener("open", this.onOpen); + + if (error) { + this.isConnecting?.done(error); + } + } finally { + this.isConnecting = null; + this.server = null; + this.reconnect(); + } + }, + ExcalidrawSyncClient.RECONNECT_INTERVAL, + { leading: true }, + ); + + private onOpen = async () => { + if (!this.isConnected) { + throw new Error( + "Received open event, but the connection is still not ready.", + ); + } + + if (!this.isConnecting) { + throw new Error( + "Can't resolve connection without `isConnecting` callback.", + ); + } + + // resolve the current connection + this.isConnecting.done(); + + // initiate pull + this.pull(); + }; + + private onClose = () => + this.disconnect( + new Error(`Received "closed" event on the sync connection`), ); - this.server.addEventListener("open", this.onOpen); - this.server.addEventListener("message", this.onMessage); - this.server.addEventListener("close", this.onClose); - this.server.addEventListener("error", this.onError); - } - - public disconnect() { - if (this.server) { - this.server.removeEventListener("open", this.onOpen); - this.server.removeEventListener("message", this.onMessage); - this.server.removeEventListener("close", this.onClose); - this.server.removeEventListener("error", this.onError); - this.server.close(); - } - } - - private onOpen = () => this.sync(); + private onError = (event: Event) => + this.disconnect( + new Error(`Received "${event.type}" on the sync connection`), + ); // TODO: could be an array buffer private onMessage = (event: MessageEvent) => { @@ -62,82 +165,126 @@ class ExcalidrawSyncClient { } }; - private onClose = () => this.disconnect(); - private onError = (error: Event) => console.error("WebSocket error:", error); - - public sync() { - const remoteChanges = this.send({ + private pull = (): void => { + this.send({ type: "pull", - payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion }, + payload: { + lastAcknowledgedVersion: this.lastAcknowledgedVersion, + }, }); - // TODO: apply remote changes - // const localChanges: Array = []; - // // TODO: apply local changes (unacknowledged) - // this.push(localChanges, 'durable'); - } + }; - public pull() { - return this.send({ - type: "pull", - payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion }, - }); - } + public push = ( + type: "durable" | "ephemeral" = "durable", + changes: Array = [], + ): void => { + const payload: PUSH_PAYLOAD = { type, changes: [] }; - public push(changes: Array, type: "durable" | "ephemeral") { - return this.send({ - type: "push", - payload: { type, changes }, - }); - } + if (type === "durable") { + // TODO: persist in idb (with insertion order) + for (const change of changes) { + this.queuedChanges.set(change.id, change); + } - public relay(buffer: ArrayBuffer) { - return this.send({ + // batch all queued changes + payload.changes = this.localChanges; + } else { + payload.changes = changes; + } + + if (payload.changes.length > 0) { + this.send({ + type: "push", + payload, + }); + } + }; + + public relay(buffer: ArrayBuffer): void { + this.send({ type: "relay", payload: { buffer }, }); } - private handleMessage(message: string) { - const [result, error] = Utils.try(() => JSON.parse(message)); - - if (error) { - console.error("Failed to parse message:", message); - return; - } - - const { type, payload } = result; - switch (type) { - case "relayed": - return this.handleRelayed(payload); - case "acknowledged": - return this.handleAcknowledged(payload); - case "rejected": - return this.handleRejected(payload); - default: - console.error("Unknown message type:", type); - } - } - - private handleRelayed(payload: { changes: Array }) { - console.log("Relayed message received:", payload); - // Process relayed changes - } - + // TODO: refactor by applying all operations to store, not to the elements private handleAcknowledged(payload: { changes: Array }) { - console.log("Acknowledged message received:", payload); - // Handle acknowledged changes + const { changes: remoteChanges } = payload; + + const oldAcknowledgedVersion = this.lastAcknowledgedVersion; + let elements = new Map( + this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]), + ) as SceneElementsMap; + + console.log("remote changes", remoteChanges); + console.log("local changes", this.localChanges); + + try { + // apply remote changes + for (const remoteChange of remoteChanges) { + if (this.queuedChanges.has(remoteChange.id)) { + // local change acknowledge by the server, safe to remove + this.queuedChanges.delete(remoteChange.id); + } else { + [elements] = ElementsChange.load(remoteChange.payload).applyTo( + elements, + this.api.store.snapshot.elements, + ); + + // TODO: we might not need to be that strict here + if (this.lastAcknowledgedVersion + 1 !== remoteChange.version) { + throw new Error( + `Received out of order change, expected "${ + this.lastAcknowledgedVersion + 1 + }", but received "${remoteChange.version}"`, + ); + } + } + + this.lastAcknowledgedVersion = remoteChange.version; + } + + // apply local changes + // TODO: only necessary when remote changes modified same element properties! + for (const localChange of this.localChanges) { + [elements] = localChange.applyTo( + elements, + this.api.store.snapshot.elements, + ); + } + + this.api.updateScene({ + elements: Array.from(elements.values()), + storeAction: "update", + }); + + // push all queued changes + this.push(); + } catch (e) { + console.error("Failed to apply acknowledged changes:", e); + // rollback the last acknowledged version + this.lastAcknowledgedVersion = oldAcknowledgedVersion; + // pull again to get the latest changes + this.pull(); + } } private handleRejected(payload: { ids: Array; message: string }) { + // handle rejected changes console.error("Rejected message received:", payload); - // Handle rejected changes } - private send(message: { type: string; payload: any }) { - if (this.server && this.server.readyState === WebSocket.OPEN) { - this.server.send(JSON.stringify(message)); - } else { - console.error("WebSocket is not open. Unable to send message."); + private handleRelayed(payload: { changes: Array }) { + // apply relayed changes / buffer + console.log("Relayed message received:", payload); + } + + private send(message: { type: string; payload: any }): void { + if (!this.isConnected) { + console.error("Can't send a message without an active connection!"); + return; } + + this.server?.send(JSON.stringify(message)); } } diff --git a/packages/excalidraw/sync/protocol.ts b/packages/excalidraw/sync/protocol.ts index 2b39e261de..48b3559d99 100644 --- a/packages/excalidraw/sync/protocol.ts +++ b/packages/excalidraw/sync/protocol.ts @@ -1,3 +1,5 @@ +import type { ElementsChange } from "../change"; + export type RELAY_PAYLOAD = { buffer: ArrayBuffer }; export type PULL_PAYLOAD = { lastAcknowledgedVersion: number }; export type PUSH_PAYLOAD = { @@ -5,11 +7,7 @@ export type PUSH_PAYLOAD = { changes: Array; }; -export type CLIENT_CHANGE = { - id: string; - appStateChange: any; - elementsChange: any; -}; +export type CLIENT_CHANGE = ElementsChange; export type CLIENT_MESSAGE = | { type: "relay"; payload: RELAY_PAYLOAD } @@ -23,7 +21,10 @@ export type SERVER_MESSAGE = payload: { changes: Array } | RELAY_PAYLOAD; } | { type: "acknowledged"; payload: { changes: Array } } - | { type: "rejected"; payload: { ids: Array; message: string } }; + | { + type: "rejected"; + payload: { changes: Array; message: string }; + }; export interface ChangesRepository { saveAll(changes: Array): Array; diff --git a/packages/excalidraw/sync/server.ts b/packages/excalidraw/sync/server.ts index ba47529b8f..59dd6c0d5c 100644 --- a/packages/excalidraw/sync/server.ts +++ b/packages/excalidraw/sync/server.ts @@ -78,6 +78,7 @@ export class ExcalidrawSyncServer { } if (versionĪ” > 0) { + // TODO: for versioning we need deletions, but not for the "snapshot" update const changes = this.changesRepository.getSinceVersion( lastAcknowledgedClientVersion, ); @@ -106,8 +107,8 @@ export class ExcalidrawSyncServer { return this.send(client, { type: "rejected", payload: { - ids: changes.map((i) => i.id), message: error.message, + changes, }, }); } diff --git a/packages/excalidraw/types.ts b/packages/excalidraw/types.ts index a0e10f8ed1..54c6cf6c59 100644 --- a/packages/excalidraw/types.ts +++ b/packages/excalidraw/types.ts @@ -756,6 +756,7 @@ export interface ExcalidrawImperativeAPI { history: { clear: InstanceType["resetHistory"]; }; + store: InstanceType["store"]; getSceneElements: InstanceType["getSceneElements"]; getAppState: () => InstanceType["state"]; getFiles: () => InstanceType["files"]; diff --git a/yarn.lock b/yarn.lock index 9f5dc5fbf9..ea16e43dbe 100644 --- a/yarn.lock +++ b/yarn.lock @@ -10145,27 +10145,13 @@ stringify-object@^3.3.0: is-obj "^1.0.1" is-regexp "^1.0.0" -"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@^6.0.0, strip-ansi@^6.0.1: +"strip-ansi-cjs@npm:strip-ansi@^6.0.1", strip-ansi@6.0.1, strip-ansi@^3.0.0, strip-ansi@^6.0.0, strip-ansi@^6.0.1, strip-ansi@^7.0.1: version "6.0.1" resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-6.0.1.tgz#9e26c63d30f53443e9489495b2105d37b67a85d9" integrity sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A== dependencies: ansi-regex "^5.0.1" -strip-ansi@^3.0.0: - version "3.0.1" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-3.0.1.tgz#6a385fb8853d952d5ff05d0e8aaf94278dc63dcf" - integrity sha512-VhumSSbBqDTP8p2ZLKj40UjBCV4+v8bUSEpUb4KjRgWk9pbqGF4REFj6KEagidb2f/M6AzC0EmFyDNGaw9OCzg== - dependencies: - ansi-regex "^2.0.0" - -strip-ansi@^7.0.1: - version "7.1.0" - resolved "https://registry.yarnpkg.com/strip-ansi/-/strip-ansi-7.1.0.tgz#d5b6568ca689d8561370b0707685d22434faff45" - integrity sha512-iq6eVVI64nQQTRYq2KtEg2d2uU7LElhTJwsH4YzIHZshxlgZms/wIc4VoDQTlG/IvVIrBKG06CrZnp0qv7hkcQ== - dependencies: - ansi-regex "^6.0.1" - strip-bom@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/strip-bom/-/strip-bom-3.0.0.tgz#2334c18e9c759f7bdd56fdef7e9ae3d588e68ed3"