Auto-reconnecting WS client

This commit is contained in:
Marcel Mraz 2024-12-21 00:27:22 +01:00
parent 040a57f56a
commit 6a17541713
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
5 changed files with 224 additions and 242 deletions

View file

@ -26,6 +26,7 @@ import {
TTDDialogTrigger, TTDDialogTrigger,
StoreAction, StoreAction,
reconcileElements, reconcileElements,
newElementWith,
} from "../packages/excalidraw"; } from "../packages/excalidraw";
import type { import type {
AppState, AppState,
@ -384,7 +385,7 @@ const ExcalidrawWrapper = () => {
setAcknowledgedIncrements([...(syncAPI?.acknowledgedIncrements ?? [])]); setAcknowledgedIncrements([...(syncAPI?.acknowledgedIncrements ?? [])]);
}, 250); }, 250);
syncAPI?.reconnect(); syncAPI?.connect();
return () => { return () => {
syncAPI?.disconnect(); syncAPI?.disconnect();
@ -647,36 +648,35 @@ const ExcalidrawWrapper = () => {
// this check is redundant, but since this is a hot path, it's best // this check is redundant, but since this is a hot path, it's best
// not to evaludate the nested expression every time // not to evaludate the nested expression every time
// CFDO: temporary if (!LocalData.isSavePaused()) {
// if (!LocalData.isSavePaused()) { LocalData.save(elements, appState, files, () => {
// LocalData.save(elements, appState, files, () => { if (excalidrawAPI) {
// if (excalidrawAPI) { let didChange = false;
// let didChange = false;
// const elements = excalidrawAPI const elements = excalidrawAPI
// .getSceneElementsIncludingDeleted() .getSceneElementsIncludingDeleted()
// .map((element) => { .map((element) => {
// if ( if (
// LocalData.fileStorage.shouldUpdateImageElementStatus(element) LocalData.fileStorage.shouldUpdateImageElementStatus(element)
// ) { ) {
// const newElement = newElementWith(element, { status: "saved" }); const newElement = newElementWith(element, { status: "saved" });
// if (newElement !== element) { if (newElement !== element) {
// didChange = true; didChange = true;
// } }
// return newElement; return newElement;
// } }
// return element; return element;
// }); });
// if (didChange) { if (didChange) {
// excalidrawAPI.updateScene({ excalidrawAPI.updateScene({
// elements, elements,
// storeAction: StoreAction.UPDATE, storeAction: StoreAction.UPDATE,
// }); });
// } }
// } }
// }); });
// } }
// Render the debug scene if the debug canvas is available // Render the debug scene if the debug canvas is available
if (debugCanvasRef.current && excalidrawAPI) { if (debugCanvasRef.current && excalidrawAPI) {
@ -885,6 +885,14 @@ const ExcalidrawWrapper = () => {
max={acknowledgedIncrements.length} max={acknowledgedIncrements.length}
value={nextVersion === -1 ? acknowledgedIncrements.length : nextVersion} value={nextVersion === -1 ? acknowledgedIncrements.length : nextVersion}
onChange={(value) => { onChange={(value) => {
if (value !== acknowledgedIncrements.length - 1) {
// don't listen to updates in the detached mode
syncAPI?.disconnect();
} else {
// reconnect once we're back to the latest version
syncAPI?.connect();
}
setNextVersion(value as number); setNextVersion(value as number);
debouncedTimeTravel(value as number); debouncedTimeTravel(value as number);
}} }}

View file

@ -69,35 +69,34 @@ class LocalFileManager extends FileManager {
}; };
} }
// CFDO: temporary const saveDataStateToLocalStorage = (
// const saveDataStateToLocalStorage = ( elements: readonly ExcalidrawElement[],
// elements: readonly ExcalidrawElement[], appState: AppState,
// appState: AppState, ) => {
// ) => { try {
// try { const _appState = clearAppStateForLocalStorage(appState);
// const _appState = clearAppStateForLocalStorage(appState);
// if ( if (
// _appState.openSidebar?.name === DEFAULT_SIDEBAR.name && _appState.openSidebar?.name === DEFAULT_SIDEBAR.name &&
// _appState.openSidebar.tab === CANVAS_SEARCH_TAB _appState.openSidebar.tab === CANVAS_SEARCH_TAB
// ) { ) {
// _appState.openSidebar = null; _appState.openSidebar = null;
// } }
// localStorage.setItem( localStorage.setItem(
// STORAGE_KEYS.LOCAL_STORAGE_ELEMENTS, STORAGE_KEYS.LOCAL_STORAGE_ELEMENTS,
// JSON.stringify(clearElementsForLocalStorage(elements)), JSON.stringify(clearElementsForLocalStorage(elements)),
// ); );
// localStorage.setItem( localStorage.setItem(
// STORAGE_KEYS.LOCAL_STORAGE_APP_STATE, STORAGE_KEYS.LOCAL_STORAGE_APP_STATE,
// JSON.stringify(_appState), JSON.stringify(_appState),
// ); );
// updateBrowserStateVersion(STORAGE_KEYS.VERSION_DATA_STATE); updateBrowserStateVersion(STORAGE_KEYS.VERSION_DATA_STATE);
// } catch (error: any) { } catch (error: any) {
// // Unable to access window.localStorage // Unable to access window.localStorage
// console.error(error); console.error(error);
// } }
// }; };
type SavingLockTypes = "collaboration"; type SavingLockTypes = "collaboration";
@ -109,12 +108,12 @@ export class LocalData {
files: BinaryFiles, files: BinaryFiles,
onFilesSaved: () => void, onFilesSaved: () => void,
) => { ) => {
// saveDataStateToLocalStorage(elements, appState); saveDataStateToLocalStorage(elements, appState);
// await this.fileStorage.saveFiles({ await this.fileStorage.saveFiles({
// elements, elements,
// files, files,
// }); });
// onFilesSaved(); onFilesSaved();
}, },
SAVE_TO_LOCAL_STORAGE_TIMEOUT, SAVE_TO_LOCAL_STORAGE_TIMEOUT,
); );

View file

@ -86,6 +86,7 @@
"png-chunks-extract": "1.0.0", "png-chunks-extract": "1.0.0",
"points-on-curve": "1.0.1", "points-on-curve": "1.0.1",
"pwacompat": "2.0.17", "pwacompat": "2.0.17",
"reconnecting-websocket": "4.4.0",
"roughjs": "4.6.4", "roughjs": "4.6.4",
"sass": "1.51.0", "sass": "1.51.0",
"tunnel-rat": "0.1.2" "tunnel-rat": "0.1.2"

View file

@ -1,7 +1,16 @@
/* eslint-disable no-console */ /* eslint-disable no-console */
import throttle from "lodash.throttle";
import debounce from "lodash.debounce"; import debounce from "lodash.debounce";
import throttle from "lodash.throttle";
import ReconnectingWebSocket, {
type Event,
type CloseEvent,
} from "reconnecting-websocket";
import { Utils } from "./utils"; import { Utils } from "./utils";
import {
SyncQueue,
type MetadataRepository,
type IncrementsRepository,
} from "./queue";
import { StoreIncrement } from "../store"; import { StoreIncrement } from "../store";
import type { ExcalidrawImperativeAPI } from "../types"; import type { ExcalidrawImperativeAPI } from "../types";
import type { SceneElementsMap } from "../element/types"; import type { SceneElementsMap } from "../element/types";
@ -11,78 +20,8 @@ import type {
SERVER_INCREMENT, SERVER_INCREMENT,
} from "./protocol"; } from "./protocol";
interface IncrementsRepository { const NO_STATUS_RECEIVED_ERROR_CODE = 1005;
loadIncrements(): Promise<{ increments: Array<StoreIncrement> } | null>; const ABNORMAL_CLOSURE_ERROR_CODE = 1006;
saveIncrements(params: { increments: Array<StoreIncrement> }): Promise<void>;
}
interface MetadataRepository {
loadMetadata(): Promise<{ lastAcknowledgedVersion: number } | null>;
saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise<void>;
}
// CFDO: make sure the increments are always acknowledged (deleted from the repository)
export class SyncQueue {
private readonly queue: Map<string, StoreIncrement>;
private readonly repository: IncrementsRepository;
private constructor(
queue: Map<string, StoreIncrement> = new Map(),
repository: IncrementsRepository,
) {
this.queue = queue;
this.repository = repository;
}
public static async create(repository: IncrementsRepository) {
const data = await repository.loadIncrements();
return new SyncQueue(
new Map(data?.increments?.map((increment) => [increment.id, increment])),
repository,
);
}
public getAll() {
return Array.from(this.queue.values());
}
public get(id: StoreIncrement["id"]) {
return this.queue.get(id);
}
public has(id: StoreIncrement["id"]) {
return this.queue.has(id);
}
public add(...increments: StoreIncrement[]) {
for (const increment of increments) {
this.queue.set(increment.id, increment);
}
this.persist();
}
public remove(...ids: StoreIncrement["id"][]) {
for (const id of ids) {
this.queue.delete(id);
}
this.persist();
}
public persist = throttle(
async () => {
try {
await this.repository.saveIncrements({ increments: this.getAll() });
} catch (e) {
console.error("Failed to persist the sync queue:", e);
}
},
1000,
{ leading: false, trailing: true },
);
}
export class SyncClient { export class SyncClient {
private static readonly HOST_URL = import.meta.env.DEV private static readonly HOST_URL = import.meta.env.DEV
@ -93,8 +32,7 @@ export class SyncClient {
? "test_room_x" ? "test_room_x"
: "test_room_prod"; : "test_room_prod";
private static readonly RECONNECT_INTERVAL = 10_000; private server: ReconnectingWebSocket | null = null;
private readonly api: ExcalidrawImperativeAPI; private readonly api: ExcalidrawImperativeAPI;
private readonly queue: SyncQueue; private readonly queue: SyncQueue;
private readonly repository: MetadataRepository; private readonly repository: MetadataRepository;
@ -120,13 +58,6 @@ export class SyncClient {
this.repository.saveMetadata({ lastAcknowledgedVersion: version }); this.repository.saveMetadata({ lastAcknowledgedVersion: version });
} }
private server: WebSocket | null = null;
private get isConnected() {
return this.server?.readyState === WebSocket.OPEN;
}
private isConnecting: { done: (error?: Error) => void } | null = null;
private constructor( private constructor(
api: ExcalidrawImperativeAPI, api: ExcalidrawImperativeAPI,
repository: MetadataRepository, repository: MetadataRepository,
@ -156,117 +87,83 @@ export class SyncClient {
}); });
} }
// CFDO: throttle does not work that well here (after some period it tries to reconnect too often) // CFDO I: throttle does not work that well here (after some period it tries to reconnect too often)
public reconnect = throttle( public connect = throttle(
async () => { () => {
try { if (this.server && this.server.readyState !== this.server.CLOSED) {
if (this.isConnected) {
console.debug("Already connected to the sync server.");
return; return;
} }
if (this.isConnecting !== null) { console.log("Connecting to the sync server...");
console.debug("Already reconnecting to the sync server..."); this.server = new ReconnectingWebSocket(
return;
}
console.info("Reconnecting to the sync server...");
const isConnecting = {
done: () => {},
};
// ensure there won't be multiple reconnection attempts
this.isConnecting = isConnecting;
return await new Promise<void>((resolve, reject) => {
this.server = new WebSocket(
`${SyncClient.HOST_URL}/connect?roomId=${this.roomId}`, `${SyncClient.HOST_URL}/connect?roomId=${this.roomId}`,
[],
{
WebSocket: undefined, // WebSocket constructor, if none provided, defaults to global WebSocket
maxReconnectionDelay: 10000, // max delay in ms between reconnections
minReconnectionDelay: 1000, // min delay in ms between reconnections
reconnectionDelayGrowFactor: 1.3, // how fast the reconnection delay grows
minUptime: 5000, // min time in ms to consider connection as stable
connectionTimeout: 4000, // retry connect if not connected after this time, in ms
maxRetries: Infinity, // maximum number of retries
maxEnqueuedMessages: Infinity, // maximum number of messages to buffer until reconnection
startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect
debug: false, // enables debug output,
},
); );
// wait for 10 seconds before timing out
const timeoutId = setTimeout(() => {
reject("Connecting the sync server timed out");
}, 10_000);
// resolved when opened, rejected on error
isConnecting.done = (error?: Error) => {
clearTimeout(timeoutId);
if (error) {
reject(error);
} else {
resolve();
}
};
this.server.addEventListener("message", this.onMessage); this.server.addEventListener("message", this.onMessage);
this.server.addEventListener("open", this.onOpen);
this.server.addEventListener("close", this.onClose); this.server.addEventListener("close", this.onClose);
this.server.addEventListener("error", this.onError); this.server.addEventListener("error", this.onError);
this.server.addEventListener("open", this.onOpen);
});
} catch (e) {
console.error("Failed to connect to sync server:", e);
this.disconnect(e as Error);
}
}, },
SyncClient.RECONNECT_INTERVAL, 1000,
{ leading: true }, { leading: true, trailing: false },
); );
public disconnect = throttle( public disconnect = throttle(
(error?: Error) => { (code?: number, reason?: string) => {
try { if (!this.server) {
this.server?.removeEventListener("message", this.onMessage); return;
this.server?.removeEventListener("close", this.onClose); }
this.server?.removeEventListener("error", this.onError);
this.server?.removeEventListener("open", this.onOpen);
this.server?.close();
if (error) { if (
this.isConnecting?.done(error); this.server.readyState === this.server.CLOSED ||
} this.server.readyState === this.server.CLOSING
} finally { ) {
this.isConnecting = null; return;
this.server = null;
this.reconnect();
} }
console.log(
`Disconnecting from the sync server with code "${code}" and reason "${reason}"...`,
);
this.server.removeEventListener("message", this.onMessage);
this.server.removeEventListener("open", this.onOpen);
this.server.removeEventListener("close", this.onClose);
this.server.removeEventListener("error", this.onError);
this.server.close(code, reason);
}, },
SyncClient.RECONNECT_INTERVAL, 1000,
{ leading: true }, { leading: true, trailing: false },
); );
private onOpen = async () => { private onOpen = (event: Event) => {
if (!this.isConnected) {
throw new Error(
"Received open event, but the connection is still not ready.",
);
}
if (!this.isConnecting) {
throw new Error(
"Can't resolve connection without `isConnecting` callback.",
);
}
// resolve the current connection
this.isConnecting.done();
// CFDO: hack to pull everything for on init // CFDO: hack to pull everything for on init
this.pull(0); this.pull(0);
}; };
private onClose = (event: CloseEvent) => { private onClose = (event: CloseEvent) => {
console.log("close", event);
this.disconnect( this.disconnect(
new Error(`Received "${event.type}" event on the sync connection`), event.code || NO_STATUS_RECEIVED_ERROR_CODE,
event.reason || "Connection closed without a reason",
); );
}; };
private onError = (event: Event) => { private onError = (event: Event) => {
console.log("error", event);
this.disconnect( this.disconnect(
new Error(`Received "${event.type}" on the sync connection`), event.type === "error"
? ABNORMAL_CLOSURE_ERROR_CODE
: NO_STATUS_RECEIVED_ERROR_CODE,
`Received "${event.type}" on the sync connection`,
); );
}; };
@ -331,10 +228,10 @@ export class SyncClient {
} }
// CFDO: should be flushed once regular push / pull goes through // CFDO: should be flushed once regular push / pull goes through
private debouncedPush = (ms: number = 1000) => private schedulePush = (ms: number = 1000) =>
debounce(this.push, ms, { leading: true, trailing: false }); debounce(this.push, ms, { leading: true, trailing: false });
private debouncedPull = (ms: number = 1000) => private schedulePull = (ms: number = 1000) =>
debounce(this.pull, ms, { leading: true, trailing: false }); debounce(this.pull, ms, { leading: true, trailing: false });
// CFDO: refactor by applying all operations to store, not to the elements // CFDO: refactor by applying all operations to store, not to the elements
@ -404,11 +301,11 @@ export class SyncClient {
this.lastAcknowledgedVersion = nextAcknowledgedVersion; this.lastAcknowledgedVersion = nextAcknowledgedVersion;
} catch (e) { } catch (e) {
console.error("Failed to apply acknowledged increments:", e); console.error("Failed to apply acknowledged increments:", e);
this.debouncedPull().call(this); this.schedulePull().call(this);
return; return;
} }
this.debouncedPush().call(this); this.schedulePush().call(this);
} }
private handleRejected(payload: { ids: Array<string>; message: string }) { private handleRejected(payload: { ids: Array<string>; message: string }) {
@ -422,10 +319,12 @@ export class SyncClient {
} }
private send(message: { type: string; payload: any }): void { private send(message: { type: string; payload: any }): void {
if (!this.isConnected) { if (!this.server) {
throw new Error("Can't send a message without an active connection!"); throw new Error(
"Can't send a message without an established connection!",
);
} }
this.server?.send(JSON.stringify(message)); this.server.send(JSON.stringify(message));
} }
} }

View file

@ -0,0 +1,75 @@
import throttle from "lodash.throttle";
import type { StoreIncrement } from "../store";
export interface IncrementsRepository {
loadIncrements(): Promise<{ increments: Array<StoreIncrement> } | null>;
saveIncrements(params: { increments: Array<StoreIncrement> }): Promise<void>;
}
export interface MetadataRepository {
loadMetadata(): Promise<{ lastAcknowledgedVersion: number } | null>;
saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise<void>;
}
// CFDO: make sure the increments are always acknowledged (deleted from the repository)
export class SyncQueue {
private readonly queue: Map<string, StoreIncrement>;
private readonly repository: IncrementsRepository;
private constructor(
queue: Map<string, StoreIncrement> = new Map(),
repository: IncrementsRepository,
) {
this.queue = queue;
this.repository = repository;
}
public static async create(repository: IncrementsRepository) {
const data = await repository.loadIncrements();
return new SyncQueue(
new Map(data?.increments?.map((increment) => [increment.id, increment])),
repository,
);
}
public getAll() {
return Array.from(this.queue.values());
}
public get(id: StoreIncrement["id"]) {
return this.queue.get(id);
}
public has(id: StoreIncrement["id"]) {
return this.queue.has(id);
}
public add(...increments: StoreIncrement[]) {
for (const increment of increments) {
this.queue.set(increment.id, increment);
}
this.persist();
}
public remove(...ids: StoreIncrement["id"][]) {
for (const id of ids) {
this.queue.delete(id);
}
this.persist();
}
public persist = throttle(
async () => {
try {
await this.repository.saveIncrements({ increments: this.getAll() });
} catch (e) {
console.error("Failed to persist the sync queue:", e);
}
},
1000,
{ leading: false, trailing: true },
);
}