From 6a17541713872cef10fb08ca62157c35203efd48 Mon Sep 17 00:00:00 2001 From: Marcel Mraz Date: Sat, 21 Dec 2024 00:27:22 +0100 Subject: [PATCH] Auto-reconnecting WS client --- excalidraw-app/App.tsx | 66 ++++---- excalidraw-app/data/LocalData.ts | 65 ++++---- packages/excalidraw/package.json | 1 + packages/excalidraw/sync/client.ts | 259 +++++++++-------------------- packages/excalidraw/sync/queue.ts | 75 +++++++++ 5 files changed, 224 insertions(+), 242 deletions(-) create mode 100644 packages/excalidraw/sync/queue.ts diff --git a/excalidraw-app/App.tsx b/excalidraw-app/App.tsx index 282c8252c8..ca3bb47157 100644 --- a/excalidraw-app/App.tsx +++ b/excalidraw-app/App.tsx @@ -26,6 +26,7 @@ import { TTDDialogTrigger, StoreAction, reconcileElements, + newElementWith, } from "../packages/excalidraw"; import type { AppState, @@ -384,7 +385,7 @@ const ExcalidrawWrapper = () => { setAcknowledgedIncrements([...(syncAPI?.acknowledgedIncrements ?? [])]); }, 250); - syncAPI?.reconnect(); + syncAPI?.connect(); return () => { syncAPI?.disconnect(); @@ -647,36 +648,35 @@ const ExcalidrawWrapper = () => { // this check is redundant, but since this is a hot path, it's best // not to evaludate the nested expression every time - // CFDO: temporary - // if (!LocalData.isSavePaused()) { - // LocalData.save(elements, appState, files, () => { - // if (excalidrawAPI) { - // let didChange = false; + if (!LocalData.isSavePaused()) { + LocalData.save(elements, appState, files, () => { + if (excalidrawAPI) { + let didChange = false; - // const elements = excalidrawAPI - // .getSceneElementsIncludingDeleted() - // .map((element) => { - // if ( - // LocalData.fileStorage.shouldUpdateImageElementStatus(element) - // ) { - // const newElement = newElementWith(element, { status: "saved" }); - // if (newElement !== element) { - // didChange = true; - // } - // return newElement; - // } - // return element; - // }); + const elements = excalidrawAPI + .getSceneElementsIncludingDeleted() + .map((element) => { + if ( + LocalData.fileStorage.shouldUpdateImageElementStatus(element) + ) { + const newElement = newElementWith(element, { status: "saved" }); + if (newElement !== element) { + didChange = true; + } + return newElement; + } + return element; + }); - // if (didChange) { - // excalidrawAPI.updateScene({ - // elements, - // storeAction: StoreAction.UPDATE, - // }); - // } - // } - // }); - // } + if (didChange) { + excalidrawAPI.updateScene({ + elements, + storeAction: StoreAction.UPDATE, + }); + } + } + }); + } // Render the debug scene if the debug canvas is available if (debugCanvasRef.current && excalidrawAPI) { @@ -885,6 +885,14 @@ const ExcalidrawWrapper = () => { max={acknowledgedIncrements.length} value={nextVersion === -1 ? acknowledgedIncrements.length : nextVersion} onChange={(value) => { + if (value !== acknowledgedIncrements.length - 1) { + // don't listen to updates in the detached mode + syncAPI?.disconnect(); + } else { + // reconnect once we're back to the latest version + syncAPI?.connect(); + } + setNextVersion(value as number); debouncedTimeTravel(value as number); }} diff --git a/excalidraw-app/data/LocalData.ts b/excalidraw-app/data/LocalData.ts index 1248038ee3..8091905fa2 100644 --- a/excalidraw-app/data/LocalData.ts +++ b/excalidraw-app/data/LocalData.ts @@ -69,35 +69,34 @@ class LocalFileManager extends FileManager { }; } -// CFDO: temporary -// const saveDataStateToLocalStorage = ( -// elements: readonly ExcalidrawElement[], -// appState: AppState, -// ) => { -// try { -// const _appState = clearAppStateForLocalStorage(appState); +const saveDataStateToLocalStorage = ( + elements: readonly ExcalidrawElement[], + appState: AppState, +) => { + try { + const _appState = clearAppStateForLocalStorage(appState); -// if ( -// _appState.openSidebar?.name === DEFAULT_SIDEBAR.name && -// _appState.openSidebar.tab === CANVAS_SEARCH_TAB -// ) { -// _appState.openSidebar = null; -// } + if ( + _appState.openSidebar?.name === DEFAULT_SIDEBAR.name && + _appState.openSidebar.tab === CANVAS_SEARCH_TAB + ) { + _appState.openSidebar = null; + } -// localStorage.setItem( -// STORAGE_KEYS.LOCAL_STORAGE_ELEMENTS, -// JSON.stringify(clearElementsForLocalStorage(elements)), -// ); -// localStorage.setItem( -// STORAGE_KEYS.LOCAL_STORAGE_APP_STATE, -// JSON.stringify(_appState), -// ); -// updateBrowserStateVersion(STORAGE_KEYS.VERSION_DATA_STATE); -// } catch (error: any) { -// // Unable to access window.localStorage -// console.error(error); -// } -// }; + localStorage.setItem( + STORAGE_KEYS.LOCAL_STORAGE_ELEMENTS, + JSON.stringify(clearElementsForLocalStorage(elements)), + ); + localStorage.setItem( + STORAGE_KEYS.LOCAL_STORAGE_APP_STATE, + JSON.stringify(_appState), + ); + updateBrowserStateVersion(STORAGE_KEYS.VERSION_DATA_STATE); + } catch (error: any) { + // Unable to access window.localStorage + console.error(error); + } +}; type SavingLockTypes = "collaboration"; @@ -109,12 +108,12 @@ export class LocalData { files: BinaryFiles, onFilesSaved: () => void, ) => { - // saveDataStateToLocalStorage(elements, appState); - // await this.fileStorage.saveFiles({ - // elements, - // files, - // }); - // onFilesSaved(); + saveDataStateToLocalStorage(elements, appState); + await this.fileStorage.saveFiles({ + elements, + files, + }); + onFilesSaved(); }, SAVE_TO_LOCAL_STORAGE_TIMEOUT, ); diff --git a/packages/excalidraw/package.json b/packages/excalidraw/package.json index fc8b3b3221..e2a13ae61e 100644 --- a/packages/excalidraw/package.json +++ b/packages/excalidraw/package.json @@ -86,6 +86,7 @@ "png-chunks-extract": "1.0.0", "points-on-curve": "1.0.1", "pwacompat": "2.0.17", + "reconnecting-websocket": "4.4.0", "roughjs": "4.6.4", "sass": "1.51.0", "tunnel-rat": "0.1.2" diff --git a/packages/excalidraw/sync/client.ts b/packages/excalidraw/sync/client.ts index 10b825244d..9bab1c8ecd 100644 --- a/packages/excalidraw/sync/client.ts +++ b/packages/excalidraw/sync/client.ts @@ -1,7 +1,16 @@ /* eslint-disable no-console */ -import throttle from "lodash.throttle"; import debounce from "lodash.debounce"; +import throttle from "lodash.throttle"; +import ReconnectingWebSocket, { + type Event, + type CloseEvent, +} from "reconnecting-websocket"; import { Utils } from "./utils"; +import { + SyncQueue, + type MetadataRepository, + type IncrementsRepository, +} from "./queue"; import { StoreIncrement } from "../store"; import type { ExcalidrawImperativeAPI } from "../types"; import type { SceneElementsMap } from "../element/types"; @@ -11,78 +20,8 @@ import type { SERVER_INCREMENT, } from "./protocol"; -interface IncrementsRepository { - loadIncrements(): Promise<{ increments: Array } | null>; - saveIncrements(params: { increments: Array }): Promise; -} - -interface MetadataRepository { - loadMetadata(): Promise<{ lastAcknowledgedVersion: number } | null>; - saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise; -} - -// CFDO: make sure the increments are always acknowledged (deleted from the repository) -export class SyncQueue { - private readonly queue: Map; - private readonly repository: IncrementsRepository; - - private constructor( - queue: Map = new Map(), - repository: IncrementsRepository, - ) { - this.queue = queue; - this.repository = repository; - } - - public static async create(repository: IncrementsRepository) { - const data = await repository.loadIncrements(); - - return new SyncQueue( - new Map(data?.increments?.map((increment) => [increment.id, increment])), - repository, - ); - } - - public getAll() { - return Array.from(this.queue.values()); - } - - public get(id: StoreIncrement["id"]) { - return this.queue.get(id); - } - - public has(id: StoreIncrement["id"]) { - return this.queue.has(id); - } - - public add(...increments: StoreIncrement[]) { - for (const increment of increments) { - this.queue.set(increment.id, increment); - } - - this.persist(); - } - - public remove(...ids: StoreIncrement["id"][]) { - for (const id of ids) { - this.queue.delete(id); - } - - this.persist(); - } - - public persist = throttle( - async () => { - try { - await this.repository.saveIncrements({ increments: this.getAll() }); - } catch (e) { - console.error("Failed to persist the sync queue:", e); - } - }, - 1000, - { leading: false, trailing: true }, - ); -} +const NO_STATUS_RECEIVED_ERROR_CODE = 1005; +const ABNORMAL_CLOSURE_ERROR_CODE = 1006; export class SyncClient { private static readonly HOST_URL = import.meta.env.DEV @@ -93,8 +32,7 @@ export class SyncClient { ? "test_room_x" : "test_room_prod"; - private static readonly RECONNECT_INTERVAL = 10_000; - + private server: ReconnectingWebSocket | null = null; private readonly api: ExcalidrawImperativeAPI; private readonly queue: SyncQueue; private readonly repository: MetadataRepository; @@ -120,13 +58,6 @@ export class SyncClient { this.repository.saveMetadata({ lastAcknowledgedVersion: version }); } - private server: WebSocket | null = null; - private get isConnected() { - return this.server?.readyState === WebSocket.OPEN; - } - - private isConnecting: { done: (error?: Error) => void } | null = null; - private constructor( api: ExcalidrawImperativeAPI, repository: MetadataRepository, @@ -156,117 +87,83 @@ export class SyncClient { }); } - // CFDO: throttle does not work that well here (after some period it tries to reconnect too often) - public reconnect = throttle( - async () => { - try { - if (this.isConnected) { - console.debug("Already connected to the sync server."); - return; - } - - if (this.isConnecting !== null) { - console.debug("Already reconnecting to the sync server..."); - return; - } - - console.info("Reconnecting to the sync server..."); - - const isConnecting = { - done: () => {}, - }; - - // ensure there won't be multiple reconnection attempts - this.isConnecting = isConnecting; - - return await new Promise((resolve, reject) => { - this.server = new WebSocket( - `${SyncClient.HOST_URL}/connect?roomId=${this.roomId}`, - ); - - // wait for 10 seconds before timing out - const timeoutId = setTimeout(() => { - reject("Connecting the sync server timed out"); - }, 10_000); - - // resolved when opened, rejected on error - isConnecting.done = (error?: Error) => { - clearTimeout(timeoutId); - - if (error) { - reject(error); - } else { - resolve(); - } - }; - - this.server.addEventListener("message", this.onMessage); - this.server.addEventListener("close", this.onClose); - this.server.addEventListener("error", this.onError); - this.server.addEventListener("open", this.onOpen); - }); - } catch (e) { - console.error("Failed to connect to sync server:", e); - this.disconnect(e as Error); + // CFDO I: throttle does not work that well here (after some period it tries to reconnect too often) + public connect = throttle( + () => { + if (this.server && this.server.readyState !== this.server.CLOSED) { + return; } + + console.log("Connecting to the sync server..."); + this.server = new ReconnectingWebSocket( + `${SyncClient.HOST_URL}/connect?roomId=${this.roomId}`, + [], + { + WebSocket: undefined, // WebSocket constructor, if none provided, defaults to global WebSocket + maxReconnectionDelay: 10000, // max delay in ms between reconnections + minReconnectionDelay: 1000, // min delay in ms between reconnections + reconnectionDelayGrowFactor: 1.3, // how fast the reconnection delay grows + minUptime: 5000, // min time in ms to consider connection as stable + connectionTimeout: 4000, // retry connect if not connected after this time, in ms + maxRetries: Infinity, // maximum number of retries + maxEnqueuedMessages: Infinity, // maximum number of messages to buffer until reconnection + startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect + debug: false, // enables debug output, + }, + ); + this.server.addEventListener("message", this.onMessage); + this.server.addEventListener("open", this.onOpen); + this.server.addEventListener("close", this.onClose); + this.server.addEventListener("error", this.onError); }, - SyncClient.RECONNECT_INTERVAL, - { leading: true }, + 1000, + { leading: true, trailing: false }, ); public disconnect = throttle( - (error?: Error) => { - try { - this.server?.removeEventListener("message", this.onMessage); - this.server?.removeEventListener("close", this.onClose); - this.server?.removeEventListener("error", this.onError); - this.server?.removeEventListener("open", this.onOpen); - this.server?.close(); - - if (error) { - this.isConnecting?.done(error); - } - } finally { - this.isConnecting = null; - this.server = null; - this.reconnect(); + (code?: number, reason?: string) => { + if (!this.server) { + return; } + + if ( + this.server.readyState === this.server.CLOSED || + this.server.readyState === this.server.CLOSING + ) { + return; + } + + console.log( + `Disconnecting from the sync server with code "${code}" and reason "${reason}"...`, + ); + this.server.removeEventListener("message", this.onMessage); + this.server.removeEventListener("open", this.onOpen); + this.server.removeEventListener("close", this.onClose); + this.server.removeEventListener("error", this.onError); + this.server.close(code, reason); }, - SyncClient.RECONNECT_INTERVAL, - { leading: true }, + 1000, + { leading: true, trailing: false }, ); - private onOpen = async () => { - if (!this.isConnected) { - throw new Error( - "Received open event, but the connection is still not ready.", - ); - } - - if (!this.isConnecting) { - throw new Error( - "Can't resolve connection without `isConnecting` callback.", - ); - } - - // resolve the current connection - this.isConnecting.done(); - + private onOpen = (event: Event) => { // CFDO: hack to pull everything for on init this.pull(0); }; private onClose = (event: CloseEvent) => { - console.log("close", event); this.disconnect( - new Error(`Received "${event.type}" event on the sync connection`), + event.code || NO_STATUS_RECEIVED_ERROR_CODE, + event.reason || "Connection closed without a reason", ); }; private onError = (event: Event) => { - console.log("error", event); this.disconnect( - new Error(`Received "${event.type}" on the sync connection`), + event.type === "error" + ? ABNORMAL_CLOSURE_ERROR_CODE + : NO_STATUS_RECEIVED_ERROR_CODE, + `Received "${event.type}" on the sync connection`, ); }; @@ -331,10 +228,10 @@ export class SyncClient { } // CFDO: should be flushed once regular push / pull goes through - private debouncedPush = (ms: number = 1000) => + private schedulePush = (ms: number = 1000) => debounce(this.push, ms, { leading: true, trailing: false }); - private debouncedPull = (ms: number = 1000) => + private schedulePull = (ms: number = 1000) => debounce(this.pull, ms, { leading: true, trailing: false }); // CFDO: refactor by applying all operations to store, not to the elements @@ -404,11 +301,11 @@ export class SyncClient { this.lastAcknowledgedVersion = nextAcknowledgedVersion; } catch (e) { console.error("Failed to apply acknowledged increments:", e); - this.debouncedPull().call(this); + this.schedulePull().call(this); return; } - this.debouncedPush().call(this); + this.schedulePush().call(this); } private handleRejected(payload: { ids: Array; message: string }) { @@ -422,10 +319,12 @@ export class SyncClient { } private send(message: { type: string; payload: any }): void { - if (!this.isConnected) { - throw new Error("Can't send a message without an active connection!"); + if (!this.server) { + throw new Error( + "Can't send a message without an established connection!", + ); } - this.server?.send(JSON.stringify(message)); + this.server.send(JSON.stringify(message)); } } diff --git a/packages/excalidraw/sync/queue.ts b/packages/excalidraw/sync/queue.ts new file mode 100644 index 0000000000..f441b7c46f --- /dev/null +++ b/packages/excalidraw/sync/queue.ts @@ -0,0 +1,75 @@ +import throttle from "lodash.throttle"; +import type { StoreIncrement } from "../store"; + +export interface IncrementsRepository { + loadIncrements(): Promise<{ increments: Array } | null>; + saveIncrements(params: { increments: Array }): Promise; +} + +export interface MetadataRepository { + loadMetadata(): Promise<{ lastAcknowledgedVersion: number } | null>; + saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise; +} + +// CFDO: make sure the increments are always acknowledged (deleted from the repository) +export class SyncQueue { + private readonly queue: Map; + private readonly repository: IncrementsRepository; + + private constructor( + queue: Map = new Map(), + repository: IncrementsRepository, + ) { + this.queue = queue; + this.repository = repository; + } + + public static async create(repository: IncrementsRepository) { + const data = await repository.loadIncrements(); + + return new SyncQueue( + new Map(data?.increments?.map((increment) => [increment.id, increment])), + repository, + ); + } + + public getAll() { + return Array.from(this.queue.values()); + } + + public get(id: StoreIncrement["id"]) { + return this.queue.get(id); + } + + public has(id: StoreIncrement["id"]) { + return this.queue.has(id); + } + + public add(...increments: StoreIncrement[]) { + for (const increment of increments) { + this.queue.set(increment.id, increment); + } + + this.persist(); + } + + public remove(...ids: StoreIncrement["id"][]) { + for (const id of ids) { + this.queue.delete(id); + } + + this.persist(); + } + + public persist = throttle( + async () => { + try { + await this.repository.saveIncrements({ increments: this.getAll() }); + } catch (e) { + console.error("Failed to persist the sync queue:", e); + } + }, + 1000, + { leading: false, trailing: true }, + ); +} \ No newline at end of file