diff --git a/excalidraw-app/App.tsx b/excalidraw-app/App.tsx index 369408e731..6fe1921f1d 100644 --- a/excalidraw-app/App.tsx +++ b/excalidraw-app/App.tsx @@ -56,7 +56,7 @@ import Collab, { collabAPIAtom, isCollaboratingAtom, isOfflineAtom, - syncAPIAtom, + syncApiAtom, } from "./collab/Collab"; import { exportToBackend, @@ -139,7 +139,6 @@ import type { ElementsChange } from "../packages/excalidraw/change"; import Slider from "rc-slider"; import "rc-slider/assets/index.css"; -import { SyncClient } from "../packages/excalidraw/sync/client"; polyfill(); @@ -370,7 +369,7 @@ const ExcalidrawWrapper = () => { const [, setShareDialogState] = useAtom(shareDialogStateAtom); const [collabAPI] = useAtom(collabAPIAtom); - const [syncAPI] = useAtom(syncAPIAtom); + const [syncAPI] = useAtom(syncApiAtom); const [nextVersion, setNextVersion] = useState(-1); const currentVersion = useRef(-1); const [acknowledgedIncrements, setAcknowledgedIncrements] = useState< @@ -389,7 +388,7 @@ const ExcalidrawWrapper = () => { syncAPI?.connect(); return () => { - syncAPI?.disconnect(SyncClient.NORMAL_CLOSURE); + syncAPI?.disconnect(); clearInterval(interval); }; }, [syncAPI]); @@ -890,7 +889,7 @@ const ExcalidrawWrapper = () => { // CFDO: in safari the whole canvas gets selected when dragging if (value !== acknowledgedIncrements.length) { // don't listen to updates in the detached mode - syncAPI?.disconnect(SyncClient.NORMAL_CLOSURE); + syncAPI?.disconnect(); } else { // reconnect once we're back to the latest version syncAPI?.connect(); diff --git a/excalidraw-app/collab/Collab.tsx b/excalidraw-app/collab/Collab.tsx index b8c42d2124..931659476e 100644 --- a/excalidraw-app/collab/Collab.tsx +++ b/excalidraw-app/collab/Collab.tsx @@ -90,7 +90,7 @@ import type { } from "../../packages/excalidraw/data/reconcile"; import { SyncClient } from "../../packages/excalidraw/sync/client"; -export const syncAPIAtom = atom(null); +export const syncApiAtom = atom(null); export const collabAPIAtom = atom(null); export const isCollaboratingAtom = atom(false); export const isOfflineAtom = atom(false); @@ -239,7 +239,7 @@ class Collab extends PureComponent { SyncClient.create(this.excalidrawAPI, SyncIndexedDBAdapter).then( (syncAPI) => { - appJotaiStore.set(syncAPIAtom, syncAPI); + appJotaiStore.set(syncApiAtom, syncAPI); }, ); @@ -276,6 +276,8 @@ class Collab extends PureComponent { window.clearTimeout(this.idleTimeoutId); this.idleTimeoutId = null; } + + appJotaiStore.get(syncApiAtom)?.disconnect(); this.onUmmount?.(); } diff --git a/packages/excalidraw/cloudflare/repository.ts b/packages/excalidraw/cloudflare/repository.ts index 491db83087..3c20665af1 100644 --- a/packages/excalidraw/cloudflare/repository.ts +++ b/packages/excalidraw/cloudflare/repository.ts @@ -11,6 +11,7 @@ export class DurableIncrementsRepository implements IncrementsRepository { // this.storage.sql.exec(`DROP TABLE IF EXISTS increments;`); // #endregion + // CFDO: payload has just 2MB limit, which might not be enough this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS increments( version INTEGER PRIMARY KEY AUTOINCREMENT, id TEXT NOT NULL UNIQUE, diff --git a/packages/excalidraw/sync/client.ts b/packages/excalidraw/sync/client.ts index 866b1fad56..9859851e35 100644 --- a/packages/excalidraw/sync/client.ts +++ b/packages/excalidraw/sync/client.ts @@ -15,10 +15,213 @@ import type { ExcalidrawImperativeAPI } from "../types"; import type { SceneElementsMap } from "../element/types"; import type { CLIENT_INCREMENT, + CLIENT_MESSAGE_RAW, PUSH_PAYLOAD, SERVER_INCREMENT, } from "./protocol"; import { debounce } from "../utils"; +import { randomId } from "../random"; + +class SocketMessage implements CLIENT_MESSAGE_RAW { + constructor( + public readonly type: "relay" | "pull" | "push", + public readonly payload: string, + public readonly chunkInfo: { + id: string; + order: number; + count: number; + } = { + id: "", + order: 0, + count: 1, + }, + ) {} +} + +class SocketClient { + // Max size for outgoing messages is 1MiB (due to CFDO limits), + // thus working with a slighter smaller limit of 800 kB (leaving 224kB for metadata) + private static readonly MAX_MESSAGE_SIZE = 800_000; + + private static readonly NORMAL_CLOSURE_CODE = 1000; + // Chrome throws "Uncaught InvalidAccessError" with message: + // "The close code must be either 1000, or between 3000 and 4999. 1009 is neither." + // therefore using custom codes instead. + private static readonly NO_STATUS_RECEIVED_ERROR_CODE = 3005; + private static readonly ABNORMAL_CLOSURE_ERROR_CODE = 3006; + private static readonly MESSAGE_IS_TOO_LARGE_ERROR_CODE = 3009; + + private isOffline = true; + private socket: ReconnectingWebSocket | null = null; + + private get isDisconnected() { + return !this.socket; + } + + constructor( + private readonly host: string, + private readonly roomId: String, + private readonly handlers: { + onOpen: (event: Event) => void; + onOnline: () => void; + onMessage: (event: MessageEvent) => void; + }, + ) {} + + private onOnline = () => { + this.isOffline = false; + this.handlers.onOnline(); + }; + + private onOffline = () => { + this.isOffline = true; + }; + + public connect = throttle( + () => { + if (!this.isDisconnected && !this.isOffline) { + return; + } + + window.addEventListener("online", this.onOnline); + window.addEventListener("offline", this.onOffline); + + console.debug("Connecting to the sync server..."); + this.socket = new ReconnectingWebSocket( + `${this.host}/connect?roomId=${this.roomId}`, + [], + { + WebSocket: undefined, // WebSocket constructor, if none provided, defaults to global WebSocket + maxReconnectionDelay: 10000, // max delay in ms between reconnections + minReconnectionDelay: 1000, // min delay in ms between reconnections + reconnectionDelayGrowFactor: 1.3, // how fast the reconnection delay grows + minUptime: 5000, // min time in ms to consider connection as stable + connectionTimeout: 4000, // retry connect if not connected after this time, in ms + maxRetries: Infinity, // maximum number of retries + maxEnqueuedMessages: 0, // maximum number of messages to buffer until reconnection + startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect + debug: true, // enables debug output, + }, + ); + this.socket.addEventListener("message", this.onMessage); + this.socket.addEventListener("open", this.onOpen); + this.socket.addEventListener("close", this.onClose); + this.socket.addEventListener("error", this.onError); + }, + 1000, + { leading: true, trailing: false }, + ); + + // CFDO: the connections seem to keep hanging for some reason + public disconnect( + code: number = SocketClient.NORMAL_CLOSURE_CODE, + reason?: string, + ) { + if (this.isDisconnected) { + return; + } + + try { + window.removeEventListener("online", this.onOnline); + window.removeEventListener("offline", this.onOffline); + + console.debug( + `Disconnecting from the sync server with code "${code}"${ + reason ? ` and reason "${reason}".` : "." + }`, + ); + this.socket?.removeEventListener("message", this.onMessage); + this.socket?.removeEventListener("open", this.onOpen); + this.socket?.removeEventListener("close", this.onClose); + this.socket?.removeEventListener("error", this.onError); + + let remappedCode = code; + + switch (code) { + case 1009: { + // remapping the code, otherwise getting "The close code must be either 1000, or between 3000 and 4999. 1009 is neither." + remappedCode = SocketClient.MESSAGE_IS_TOO_LARGE_ERROR_CODE; + break; + } + } + + this.socket?.close(remappedCode, reason); + } finally { + this.socket = null; + } + } + + public send(message: { + type: "relay" | "pull" | "push"; + payload: any; + }): void { + if (this.isOffline) { + // connection opened, don't let the WS buffer the messages, + // as we do explicitly buffer unacknowledged increments + return; + } + + // CFDO: could be closed / closing / connecting + if (this.isDisconnected) { + this.connect(); + return; + } + + const { type, payload } = message; + + const stringifiedPayload = JSON.stringify(payload); + const payloadSize = new TextEncoder().encode(stringifiedPayload).byteLength; + + if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) { + const message = new SocketMessage(type, stringifiedPayload); + return this.socket?.send(JSON.stringify(message)); + } + + const chunkId = randomId(); + const chunkSize = SocketClient.MAX_MESSAGE_SIZE; + const chunksCount = Math.ceil(payloadSize / chunkSize); + + for (let i = 0; i < chunksCount; i++) { + const start = i * chunkSize; + const end = start + chunkSize; + const chunkedPayload = stringifiedPayload.slice(start, end); + const message = new SocketMessage(type, chunkedPayload, { + id: chunkId, + order: i, + count: chunksCount, + }); + + this.socket?.send(JSON.stringify(message)); + } + } + + private onMessage = (event: MessageEvent) => { + this.handlers.onMessage(event); + }; + + private onOpen = (event: Event) => { + this.isOffline = false; + this.handlers.onOpen(event); + }; + + private onClose = (event: CloseEvent) => { + this.disconnect( + event.code || SocketClient.NO_STATUS_RECEIVED_ERROR_CODE, + event.reason || "Connection closed without a reason", + ); + this.connect(); + }; + + private onError = (event: Event) => { + this.disconnect( + event.type === "error" + ? SocketClient.ABNORMAL_CLOSURE_ERROR_CODE + : SocketClient.NO_STATUS_RECEIVED_ERROR_CODE, + `Received "${event.type}" on the sync connection`, + ); + this.connect(); + }; +} interface AcknowledgedIncrement { increment: StoreIncrement; @@ -26,10 +229,6 @@ interface AcknowledgedIncrement { } export class SyncClient { - public static readonly NORMAL_CLOSURE = 1000; - public static readonly NO_STATUS_RECEIVED_ERROR_CODE = 1005; - public static readonly ABNORMAL_CLOSURE_ERROR_CODE = 1006; - private static readonly HOST_URL = import.meta.env.DEV ? "ws://localhost:8787" : "https://excalidraw-sync.marcel-529.workers.dev"; @@ -38,11 +237,12 @@ export class SyncClient { ? "test_room_x" : "test_room_prod"; - private server: ReconnectingWebSocket | null = null; private readonly api: ExcalidrawImperativeAPI; private readonly queue: SyncQueue; - private readonly repository: MetadataRepository; + private readonly metadata: MetadataRepository; + private readonly client: SocketClient; + // #region ACKNOWLEDGED INCREMENTS & METADATA // CFDO: shouldn't be stateful, only request / response private readonly acknowledgedIncrementsMap: Map< string, @@ -55,8 +255,6 @@ export class SyncClient { .map((x) => x.increment); } - private readonly roomId: string; - private _lastAcknowledgedVersion = 0; private get lastAcknowledgedVersion() { @@ -65,26 +263,31 @@ export class SyncClient { private set lastAcknowledgedVersion(version: number) { this._lastAcknowledgedVersion = version; - this.repository.saveMetadata({ lastAcknowledgedVersion: version }); + this.metadata.saveMetadata({ lastAcknowledgedVersion: version }); } + // #endregion private constructor( api: ExcalidrawImperativeAPI, repository: MetadataRepository, queue: SyncQueue, - options: { roomId: string; lastAcknowledgedVersion: number }, + options: { host: string; roomId: string; lastAcknowledgedVersion: number }, ) { this.api = api; - this.repository = repository; + this.metadata = repository; this.queue = queue; - this.roomId = options.roomId; this.lastAcknowledgedVersion = options.lastAcknowledgedVersion; + this.client = new SocketClient(options.host, options.roomId, { + onOpen: this.onOpen, + onOnline: this.onOnline, + onMessage: this.onMessage, + }); } + // #region SYNC_CLIENT FACTORY public static async create( api: ExcalidrawImperativeAPI, repository: IncrementsRepository & MetadataRepository, - roomId: string = SyncClient.ROOM_ID, ) { const [queue, metadata] = await Promise.all([ SyncQueue.create(repository), @@ -92,116 +295,24 @@ export class SyncClient { ]); return new SyncClient(api, repository, queue, { - roomId, + host: SyncClient.HOST_URL, + roomId: SyncClient.ROOM_ID, lastAcknowledgedVersion: metadata?.lastAcknowledgedVersion ?? 0, }); } + // #endregion - public connect = throttle( - () => { - if (this.server && this.server.readyState !== this.server.CLOSED) { - return; - } + // #region PUBLIC API METHODS + public connect() { + return this.client.connect(); + } - console.log("Connecting to the sync server..."); - this.server = new ReconnectingWebSocket( - `${SyncClient.HOST_URL}/connect?roomId=${this.roomId}`, - [], - { - WebSocket: undefined, // WebSocket constructor, if none provided, defaults to global WebSocket - maxReconnectionDelay: 10000, // max delay in ms between reconnections - minReconnectionDelay: 1000, // min delay in ms between reconnections - reconnectionDelayGrowFactor: 1.3, // how fast the reconnection delay grows - minUptime: 5000, // min time in ms to consider connection as stable - connectionTimeout: 4000, // retry connect if not connected after this time, in ms - maxRetries: Infinity, // maximum number of retries - maxEnqueuedMessages: Infinity, // maximum number of messages to buffer until reconnection - startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect - debug: false, // enables debug output, - }, - ); - this.server.addEventListener("message", this.onMessage); - this.server.addEventListener("open", this.onOpen); - this.server.addEventListener("close", this.onClose); - this.server.addEventListener("error", this.onError); - }, - 1000, - { leading: true, trailing: false }, - ); + public disconnect() { + return this.client.disconnect(); + } - public disconnect = throttle( - (code: number, reason?: string) => { - if (!this.server) { - return; - } - - if ( - this.server.readyState === this.server.CLOSED || - this.server.readyState === this.server.CLOSING - ) { - return; - } - - console.log( - `Disconnecting from the sync server with code "${code}"${ - reason ? ` and reason "${reason}".` : "." - }`, - ); - this.server.removeEventListener("message", this.onMessage); - this.server.removeEventListener("open", this.onOpen); - this.server.removeEventListener("close", this.onClose); - this.server.removeEventListener("error", this.onError); - this.server.close(code, reason); - }, - 1000, - { leading: true, trailing: false }, - ); - - private onOpen = (event: Event) => { - // CFDO: hack to pull everything for on init - this.pull(0); - }; - - private onClose = (event: CloseEvent) => { - this.disconnect( - event.code || SyncClient.NO_STATUS_RECEIVED_ERROR_CODE, - event.reason || "Connection closed without a reason", - ); - }; - - private onError = (event: Event) => { - this.disconnect( - event.type === "error" - ? SyncClient.ABNORMAL_CLOSURE_ERROR_CODE - : SyncClient.NO_STATUS_RECEIVED_ERROR_CODE, - `Received "${event.type}" on the sync connection`, - ); - }; - - // CFDO: could be an array buffer - private onMessage = (event: MessageEvent) => { - const [result, error] = Utils.try(() => JSON.parse(event.data as string)); - - if (error) { - console.error("Failed to parse message:", event.data); - return; - } - - const { type, payload } = result; - switch (type) { - case "relayed": - return this.handleRelayed(payload); - case "acknowledged": - return this.handleAcknowledged(payload); - case "rejected": - return this.handleRejected(payload); - default: - console.error("Unknown message type:", type); - } - }; - - private pull(sinceVersion?: number): void { - this.send({ + public pull(sinceVersion?: number): void { + this.client.send({ type: "pull", payload: { lastAcknowledgedVersion: sinceVersion ?? this.lastAcknowledgedVersion, @@ -224,26 +335,57 @@ export class SyncClient { } if (payload.increments.length > 0) { - this.send({ - type: "push", - payload, - }); + this.client.send({ type: "push", payload }); } } public relay(buffer: ArrayBuffer): void { - this.send({ + this.client.send({ type: "relay", payload: { buffer }, }); } + // #endregion - // CFDO: could be flushed once regular push / pull goes through - private schedulePush = debounce(() => this.push(), 1000); - private schedulePull = debounce(() => this.pull(), 1000); + // #region PRIVATE SOCKET MESSAGE HANDLERS + private onOpen = (event: Event) => { + // CFDO: hack to pull everything for on init + this.pull(0); + this.push(); + }; + + private onOnline = () => { + // perform incremental sync + this.pull(); + this.push(); + }; + + private onMessage = (event: MessageEvent) => { + // CFDO: could be an array buffer + const [result, error] = Utils.try(() => JSON.parse(event.data as string)); + + if (error) { + console.error("Failed to parse message:", event.data); + return; + } + + const { type, payload } = result; + switch (type) { + case "relayed": + return this.handleRelayed(payload); + case "acknowledged": + return this.handleAcknowledged(payload); + case "rejected": + return this.handleRejected(payload); + default: + console.error("Unknown message type:", type); + } + }; // CFDO: refactor by applying all operations to store, not to the elements - private handleAcknowledged(payload: { increments: Array }) { + private handleAcknowledged = (payload: { + increments: Array; + }) => { let nextAcknowledgedVersion = this.lastAcknowledgedVersion; let elements = new Map( // CFDO: retrieve the map already @@ -310,30 +452,26 @@ export class SyncClient { this.lastAcknowledgedVersion = nextAcknowledgedVersion; } catch (e) { console.error("Failed to apply acknowledged increments:", e); + // CFDO: might just be on error this.schedulePull(); - return; } + }; - this.schedulePush(); - } - - private handleRejected(payload: { ids: Array; message: string }) { + private handleRejected = (payload: { + ids: Array; + message: string; + }) => { // handle rejected increments console.error("Rejected message received:", payload); - } + }; - private handleRelayed(payload: { increments: Array }) { + private handleRelayed = (payload: { + increments: Array; + }) => { // apply relayed increments / buffer console.log("Relayed message received:", payload); - } + }; - private send(message: { type: string; payload: any }): void { - if (!this.server) { - throw new Error( - "Can't send a message without an established connection!", - ); - } - - this.server.send(JSON.stringify(message)); - } + private schedulePull = debounce(() => this.pull(), 1000); + // #endregion } diff --git a/packages/excalidraw/sync/protocol.ts b/packages/excalidraw/sync/protocol.ts index 73ebabbe38..4a0e1b88c0 100644 --- a/packages/excalidraw/sync/protocol.ts +++ b/packages/excalidraw/sync/protocol.ts @@ -9,6 +9,18 @@ export type PUSH_PAYLOAD = { export type CLIENT_INCREMENT = StoreIncrement; +export type CLIENT_MESSAGE_METADATA = { + id: string; + order: number; + count: number; +}; + +export type CLIENT_MESSAGE_RAW = { + type: "relay" | "pull" | "push"; + payload: string; + chunkInfo: CLIENT_MESSAGE_METADATA; +}; + export type CLIENT_MESSAGE = | { type: "relay"; payload: RELAY_PAYLOAD } | { type: "pull"; payload: PULL_PAYLOAD } diff --git a/packages/excalidraw/sync/server.ts b/packages/excalidraw/sync/server.ts index 0fb0bd3263..ead827543b 100644 --- a/packages/excalidraw/sync/server.ts +++ b/packages/excalidraw/sync/server.ts @@ -10,6 +10,7 @@ import type { RELAY_PAYLOAD, SERVER_MESSAGE, SERVER_INCREMENT, + CLIENT_MESSAGE_RAW, } from "./protocol"; // CFDO: message could be binary (cbor, protobuf, etc.) @@ -20,6 +21,10 @@ import type { export class ExcalidrawSyncServer { private readonly lock: AsyncLock = new AsyncLock(); private readonly sessions: Set = new Set(); + private readonly chunks = new Map< + CLIENT_MESSAGE_RAW["chunkInfo"]["id"], + Map + >(); constructor(private readonly incrementsRepository: IncrementsRepository) {} @@ -31,31 +36,119 @@ export class ExcalidrawSyncServer { this.sessions.delete(client); } - public onMessage(client: WebSocket, message: string) { - const [result, error] = Utils.try(() => - JSON.parse(message), + public onMessage(client: WebSocket, message: string): Promise | void { + const [parsedMessage, parseMessageError] = Utils.try( + () => { + return JSON.parse(message); + }, ); - if (error) { - console.error(error); + if (parseMessageError) { + console.error(parseMessageError); + return; + } + + const { type, payload, chunkInfo } = parsedMessage; + + // if there are more than 1 chunks, process them first + if (chunkInfo.count > 1) { + return this.processChunks(client, parsedMessage); + } + + const [parsedPayload, parsePayloadError] = Utils.try< + CLIENT_MESSAGE["payload"] + >(() => JSON.parse(payload)); + + if (parsePayloadError) { + console.error(parsePayloadError); return; } - const { type, payload } = result; switch (type) { case "relay": - return this.relay(client, payload); + return this.relay(client, parsedPayload as RELAY_PAYLOAD); case "pull": - return this.pull(client, payload); + return this.pull(client, parsedPayload as PULL_PAYLOAD); case "push": // apply each one-by-one to avoid race conditions // CFDO: in theory we do not need to block ephemeral appState changes - return this.lock.acquire("push", () => this.push(client, payload)); + return this.lock.acquire("push", () => + this.push(client, parsedPayload as PUSH_PAYLOAD), + ); default: console.error(`Unknown message type: ${type}`); } } + /** + * Process chunks in case the client-side payload would overflow the 1MiB durable object WS message limit. + */ + private processChunks(client: WebSocket, message: CLIENT_MESSAGE_RAW) { + let shouldCleanupchunks = true; + const { + type, + payload, + chunkInfo: { id, order, count }, + } = message; + + try { + if (!this.chunks.has(id)) { + this.chunks.set(id, new Map()); + } + + const chunks = this.chunks.get(id); + + if (!chunks) { + // defensive, shouldn't really happen + throw new Error(`Coudn't find a relevant chunk with id "${id}"!`); + } + + // set the buffer by order + chunks.set(order, payload); + + if (chunks.size !== count) { + // we don't have all the chunks, don't cleanup just yet! + shouldCleanupchunks = false; + return; + } + + // hopefully we can fit into the 128 MiB memory limit + const restoredPayload = Array.from(chunks) + .sort((a, b) => (a <= b ? -1 : 1)) + .reduce((acc, [_, payload]) => (acc += payload), ""); + + const rawMessage = JSON.stringify({ + type, + payload: restoredPayload, + // id is irrelevant if we are sending just one chunk + chunkInfo: { id: "", order: 0, count: 1 }, + } as CLIENT_MESSAGE_RAW); + + // process the message + return this.onMessage(client, rawMessage); + } catch (error) { + console.error(`Error while processing chunk "${id}"`, error); + } finally { + // cleanup the chunks + if (shouldCleanupchunks) { + this.chunks.delete(id); + } + } + } + + private relay( + client: WebSocket, + payload: { increments: Array } | RELAY_PAYLOAD, + ) { + return this.broadcast( + { + type: "relayed", + payload, + }, + client, + ); + } + private pull(client: WebSocket, payload: PULL_PAYLOAD) { // CFDO: test for invalid payload const lastAcknowledgedClientVersion = payload.lastAcknowledgedVersion; @@ -121,23 +214,10 @@ export class ExcalidrawSyncServer { }, }); default: - console.error(`Unknown message type: ${type}`); + console.error(`Unknown push message type: ${type}`); } } - private relay( - client: WebSocket, - payload: { increments: Array } | RELAY_PAYLOAD, - ) { - return this.broadcast( - { - type: "relayed", - payload, - }, - client, - ); - } - private send(client: WebSocket, message: SERVER_MESSAGE) { const msg = JSON.stringify(message); client.send(msg);