From f6061f5ec600b05d256b9a0cd22f5017c8bc9f02 Mon Sep 17 00:00:00 2001 From: Marcel Mraz Date: Mon, 30 Dec 2024 13:44:53 +0100 Subject: [PATCH] Sharding rows due to SQLite limits --- excalidraw-app/App.tsx | 2 +- excalidraw-app/data/LocalData.ts | 36 +++-- packages/excalidraw/cloudflare/repository.ts | 137 +++++++++++++------ packages/excalidraw/sync/client.ts | 101 ++++++-------- packages/excalidraw/sync/protocol.ts | 28 ++-- packages/excalidraw/sync/queue.ts | 12 +- packages/excalidraw/sync/server.ts | 106 +++++++------- 7 files changed, 228 insertions(+), 194 deletions(-) diff --git a/excalidraw-app/App.tsx b/excalidraw-app/App.tsx index 6fe1921f1d..8184967408 100644 --- a/excalidraw-app/App.tsx +++ b/excalidraw-app/App.tsx @@ -697,7 +697,7 @@ const ExcalidrawWrapper = () => { // CFDO: some appState like selections should also be transfered (we could even persist it) if (!elementsChange.isEmpty()) { try { - syncAPI?.push("durable", increment); + syncAPI?.push(increment); } catch (e) { console.error(e); } diff --git a/excalidraw-app/data/LocalData.ts b/excalidraw-app/data/LocalData.ts index 8091905fa2..0c0868b4d5 100644 --- a/excalidraw-app/data/LocalData.ts +++ b/excalidraw-app/data/LocalData.ts @@ -108,12 +108,12 @@ export class LocalData { files: BinaryFiles, onFilesSaved: () => void, ) => { - saveDataStateToLocalStorage(elements, appState); - await this.fileStorage.saveFiles({ - elements, - files, - }); - onFilesSaved(); + // saveDataStateToLocalStorage(elements, appState); + // await this.fileStorage.saveFiles({ + // elements, + // files, + // }); + // onFilesSaved(); }, SAVE_TO_LOCAL_STORAGE_TIMEOUT, ); @@ -260,13 +260,11 @@ export class LibraryLocalStorageMigrationAdapter { } } -interface SyncIncrementPersistedData { - increments: DTO[]; -} +type SyncIncrementPersistedData = DTO[]; -interface SyncMetaPersistedData { +type SyncMetaPersistedData = { lastAcknowledgedVersion: number; -} +}; export class SyncIndexedDBAdapter { /** IndexedDB database and store name */ @@ -281,17 +279,15 @@ export class SyncIndexedDBAdapter { ); static async loadIncrements() { - const IDBData = await get( + const increments = await get( SyncIndexedDBAdapter.incrementsKey, SyncIndexedDBAdapter.store, ); - if (IDBData?.increments?.length) { - return { - increments: IDBData.increments.map((storeIncrementDTO) => - StoreIncrement.restore(storeIncrementDTO), - ), - }; + if (increments?.length) { + return increments.map((storeIncrementDTO) => + StoreIncrement.restore(storeIncrementDTO), + ); } return null; @@ -306,12 +302,12 @@ export class SyncIndexedDBAdapter { } static async loadMetadata() { - const IDBData = await get( + const metadata = await get( SyncIndexedDBAdapter.metadataKey, SyncIndexedDBAdapter.store, ); - return IDBData || null; + return metadata || null; } static async saveMetadata(data: SyncMetaPersistedData): Promise { diff --git a/packages/excalidraw/cloudflare/repository.ts b/packages/excalidraw/cloudflare/repository.ts index 3c20665af1..73648e768d 100644 --- a/packages/excalidraw/cloudflare/repository.ts +++ b/packages/excalidraw/cloudflare/repository.ts @@ -6,71 +6,87 @@ import type { // CFDO: add senderId, possibly roomId as well export class DurableIncrementsRepository implements IncrementsRepository { + // there is a 2MB row limit, hence working max row size of 1.5 MB + // and leaving a buffer for other row metadata + private static readonly MAX_PAYLOAD_SIZE = 1_500_000; + constructor(private storage: DurableObjectStorage) { // #region DEV ONLY // this.storage.sql.exec(`DROP TABLE IF EXISTS increments;`); // #endregion - // CFDO: payload has just 2MB limit, which might not be enough this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS increments( - version INTEGER PRIMARY KEY AUTOINCREMENT, - id TEXT NOT NULL UNIQUE, + id TEXT NOT NULL, + version INTEGER NOT NULL, + position INTEGER NOT NULL, + payload TEXT NOT NULL, createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, - payload TEXT + PRIMARY KEY (id, version, position) );`); } - public saveAll(increments: Array) { + public save(increment: CLIENT_INCREMENT): SERVER_INCREMENT | null { return this.storage.transactionSync(() => { - const prevVersion = this.getLastVersion(); - const acknowledged: Array = []; + const existingIncrement = this.getById(increment.id); + + // don't perist the same increment twice + if (existingIncrement) { + return existingIncrement; + } + + try { + const payload = JSON.stringify(increment); + const payloadSize = new TextEncoder().encode(payload).byteLength; + const chunkVersion = this.getLastVersion() + 1; + const chunksCount = Math.ceil( + payloadSize / DurableIncrementsRepository.MAX_PAYLOAD_SIZE, + ); + + for (let position = 0; position < chunksCount; position++) { + const start = position * DurableIncrementsRepository.MAX_PAYLOAD_SIZE; + const end = start + DurableIncrementsRepository.MAX_PAYLOAD_SIZE; + // slicing the chunk payload + const chunkedPayload = payload.slice(start, end); - for (const increment of increments) { - try { - // unique id ensures that we don't acknowledge the same increment twice this.storage.sql.exec( - `INSERT INTO increments (id, payload) VALUES (?, ?);`, + `INSERT INTO increments (id, version, position, payload) VALUES (?, ?, ?, ?);`, increment.id, - JSON.stringify(increment), + chunkVersion, + position, + chunkedPayload, ); - } catch (e) { - // check if the increment has been already acknowledged - // in case client for some reason did not receive acknowledgement - // and reconnected while the we still have the increment in the worker - // otherwise the client is doomed to full a restore - if ( - e instanceof Error && - e.message.includes( - "UNIQUE constraint failed: increments.id: SQLITE_CONSTRAINT", - ) - ) { - acknowledged.push(this.getById(increment.id)); - continue; - } - + } + } catch (e) { + // check if the increment has been already acknowledged + // in case client for some reason did not receive acknowledgement + // and reconnected while the we still have the increment in the worker + // otherwise the client is doomed to full a restore + if (e instanceof Error && e.message.includes("SQLITE_CONSTRAINT")) { + // continue; + } else { throw e; } } - // query the just added increments - acknowledged.push(...this.getSinceVersion(prevVersion)); - + const acknowledged = this.getById(increment.id); return acknowledged; }); } - public getSinceVersion(version: number): Array { - // CFDO: for versioning we need deletions, but not for the "snapshot" update; - return this.storage.sql + // CFDO: for versioning we need deletions, but not for the "snapshot" update; + public getAllSinceVersion(version: number): Array { + const increments = this.storage.sql .exec( - `SELECT id, payload, version FROM increments WHERE version > (?) ORDER BY version, createdAt ASC;`, + `SELECT id, payload, version FROM increments WHERE version > (?) ORDER BY version, position, createdAt ASC;`, version, ) .toArray(); + + return this.restoreServerIncrements(increments); } public getLastVersion(): number { - // CFDO: might be in memory to reduce number of rows read (or index on version at least, if btree affect rows read) + // CFDO: might be in memory to reduce number of rows read (or position on version at least, if btree affect rows read) const result = this.storage.sql .exec(`SELECT MAX(version) FROM increments;`) .one(); @@ -78,12 +94,55 @@ export class DurableIncrementsRepository implements IncrementsRepository { return result ? Number(result["MAX(version)"]) : 0; } - public getById(id: string): SERVER_INCREMENT { - return this.storage.sql + public getById(id: string): SERVER_INCREMENT | null { + const increments = this.storage.sql .exec( - `SELECT id, payload, version FROM increments WHERE id = (?)`, + `SELECT id, payload, version FROM increments WHERE id = (?) ORDER BY position ASC`, id, ) - .one(); + .toArray(); + + if (!increments.length) { + return null; + } + + const restoredIncrements = this.restoreServerIncrements(increments); + + if (restoredIncrements.length !== 1) { + throw new Error( + `Expected exactly one restored increment, but received "${restoredIncrements.length}".`, + ); + } + + return restoredIncrements[0]; + } + + private restoreServerIncrements( + increments: SERVER_INCREMENT[], + ): SERVER_INCREMENT[] { + return Array.from( + increments + .reduce((acc, curr) => { + const increment = acc.get(curr.version); + + if (increment) { + acc.set(curr.version, { + ...increment, + // glueing the chunks payload back + payload: increment.payload + curr.payload, + }); + } else { + // let's not unnecessarily expose more props than these + acc.set(curr.version, { + id: curr.id, + version: curr.version, + payload: curr.payload, + }); + } + + return acc; + }, new Map()) + .values(), + ); } } diff --git a/packages/excalidraw/sync/client.ts b/packages/excalidraw/sync/client.ts index 9859851e35..6392080dd7 100644 --- a/packages/excalidraw/sync/client.ts +++ b/packages/excalidraw/sync/client.ts @@ -16,7 +16,6 @@ import type { SceneElementsMap } from "../element/types"; import type { CLIENT_INCREMENT, CLIENT_MESSAGE_RAW, - PUSH_PAYLOAD, SERVER_INCREMENT, } from "./protocol"; import { debounce } from "../utils"; @@ -26,14 +25,10 @@ class SocketMessage implements CLIENT_MESSAGE_RAW { constructor( public readonly type: "relay" | "pull" | "push", public readonly payload: string, - public readonly chunkInfo: { + public readonly chunkInfo?: { id: string; - order: number; + position: number; count: number; - } = { - id: "", - order: 0, - count: 1, }, ) {} } @@ -100,7 +95,7 @@ class SocketClient { maxRetries: Infinity, // maximum number of retries maxEnqueuedMessages: 0, // maximum number of messages to buffer until reconnection startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect - debug: true, // enables debug output, + debug: false, // enables debug output, }, ); this.socket.addEventListener("message", this.onMessage); @@ -181,13 +176,13 @@ class SocketClient { const chunkSize = SocketClient.MAX_MESSAGE_SIZE; const chunksCount = Math.ceil(payloadSize / chunkSize); - for (let i = 0; i < chunksCount; i++) { - const start = i * chunkSize; + for (let position = 0; position < chunksCount; position++) { + const start = position * chunkSize; const end = start + chunkSize; const chunkedPayload = stringifiedPayload.slice(start, end); const message = new SocketMessage(type, chunkedPayload, { id: chunkId, - order: i, + position, count: chunksCount, }); @@ -289,15 +284,11 @@ export class SyncClient { api: ExcalidrawImperativeAPI, repository: IncrementsRepository & MetadataRepository, ) { - const [queue, metadata] = await Promise.all([ - SyncQueue.create(repository), - repository.loadMetadata(), - ]); - + const queue = await SyncQueue.create(repository); return new SyncClient(api, repository, queue, { host: SyncClient.HOST_URL, roomId: SyncClient.ROOM_ID, - lastAcknowledgedVersion: metadata?.lastAcknowledgedVersion ?? 0, + lastAcknowledgedVersion: 0, }); } // #endregion @@ -320,22 +311,19 @@ export class SyncClient { }); } - public push( - type: "durable" | "ephemeral" = "durable", - ...increments: Array - ): void { - const payload: PUSH_PAYLOAD = { type, increments: [] }; - - if (type === "durable") { - this.queue.add(...increments); - // batch all (already) queued increments - payload.increments = this.queue.getAll(); - } else { - payload.increments = increments; + public push(increment?: StoreIncrement): void { + if (increment) { + this.queue.add(increment); } - if (payload.increments.length > 0) { - this.client.send({ type: "push", payload }); + // re-send all already queued increments + for (const queuedIncrement of this.queue.getAll()) { + this.client.send({ + type: "push", + payload: { + ...queuedIncrement, + }, + }); } } @@ -403,12 +391,6 @@ export class SyncClient { version, }); - // local increment shall not have to be applied again - if (this.queue.has(id)) { - this.queue.remove(id); - continue; - } - // we've already applied this increment if (version <= nextAcknowledgedVersion) { continue; @@ -419,7 +401,7 @@ export class SyncClient { } else { // it's fine to apply increments our of order, // as they are idempontent, so that we can re-apply them again, - // as long as we don't mark them as acknowledged + // as long as we don't mark their version as acknowledged console.debug( `Received out of order increment, expected "${ nextAcknowledgedVersion + 1 @@ -427,27 +409,32 @@ export class SyncClient { ); } - // apply remote increment with higher version than the last acknowledged one - const remoteIncrement = StoreIncrement.load(payload); - [elements] = remoteIncrement.elementsChange.applyTo( - elements, - this.api.store.snapshot.elements, - ); - } + // local increment shall not have to be applied again + if (this.queue.has(id)) { + this.queue.remove(id); + } else { + // apply remote increment with higher version than the last acknowledged one + const remoteIncrement = StoreIncrement.load(payload); + [elements] = remoteIncrement.elementsChange.applyTo( + elements, + this.api.store.snapshot.elements, + ); + } - // apply local increments - for (const localIncrement of this.queue.getAll()) { - // CFDO: in theory only necessary when remote increments modified same element properties! - [elements] = localIncrement.elementsChange.applyTo( - elements, - this.api.store.snapshot.elements, - ); - } + // apply local increments + for (const localIncrement of this.queue.getAll()) { + // CFDO: in theory only necessary when remote increments modified same element properties! + [elements] = localIncrement.elementsChange.applyTo( + elements, + this.api.store.snapshot.elements, + ); + } - this.api.updateScene({ - elements: Array.from(elements.values()), - storeAction: "update", - }); + this.api.updateScene({ + elements: Array.from(elements.values()), + storeAction: "update", + }); + } this.lastAcknowledgedVersion = nextAcknowledgedVersion; } catch (e) { diff --git a/packages/excalidraw/sync/protocol.ts b/packages/excalidraw/sync/protocol.ts index 4a0e1b88c0..541f5662de 100644 --- a/packages/excalidraw/sync/protocol.ts +++ b/packages/excalidraw/sync/protocol.ts @@ -1,36 +1,36 @@ import type { StoreIncrement } from "../store"; +import type { DTO } from "../utility-types"; + +export type CLIENT_INCREMENT = DTO; export type RELAY_PAYLOAD = { buffer: ArrayBuffer }; export type PULL_PAYLOAD = { lastAcknowledgedVersion: number }; -export type PUSH_PAYLOAD = { - type: "durable" | "ephemeral"; - increments: Array; -}; +export type PUSH_PAYLOAD = CLIENT_INCREMENT; -export type CLIENT_INCREMENT = StoreIncrement; - -export type CLIENT_MESSAGE_METADATA = { +export type CHUNK_INFO = { id: string; - order: number; + position: number; count: number; }; export type CLIENT_MESSAGE_RAW = { type: "relay" | "pull" | "push"; payload: string; - chunkInfo: CLIENT_MESSAGE_METADATA; + chunkInfo?: CHUNK_INFO; }; -export type CLIENT_MESSAGE = +export type CLIENT_MESSAGE = { chunkInfo: CHUNK_INFO } & ( | { type: "relay"; payload: RELAY_PAYLOAD } | { type: "pull"; payload: PULL_PAYLOAD } - | { type: "push"; payload: PUSH_PAYLOAD }; + | { type: "push"; payload: PUSH_PAYLOAD } +); export type SERVER_INCREMENT = { id: string; version: number; payload: string }; export type SERVER_MESSAGE = | { type: "relayed"; - payload: { increments: Array } | RELAY_PAYLOAD; + // CFDO: should likely be just elements + // payload: { increments: Array } | RELAY_PAYLOAD; } | { type: "acknowledged"; payload: { increments: Array } } | { @@ -39,8 +39,8 @@ export type SERVER_MESSAGE = }; export interface IncrementsRepository { - saveAll(increments: Array): Array; - getSinceVersion(version: number): Array; + save(increment: CLIENT_INCREMENT): SERVER_INCREMENT | null; + getAllSinceVersion(version: number): Array; getLastVersion(): number; } diff --git a/packages/excalidraw/sync/queue.ts b/packages/excalidraw/sync/queue.ts index f441b7c46f..e442a6d705 100644 --- a/packages/excalidraw/sync/queue.ts +++ b/packages/excalidraw/sync/queue.ts @@ -2,8 +2,8 @@ import throttle from "lodash.throttle"; import type { StoreIncrement } from "../store"; export interface IncrementsRepository { - loadIncrements(): Promise<{ increments: Array } | null>; - saveIncrements(params: { increments: Array }): Promise; + loadIncrements(): Promise | null>; + saveIncrements(params: StoreIncrement[]): Promise; } export interface MetadataRepository { @@ -25,10 +25,10 @@ export class SyncQueue { } public static async create(repository: IncrementsRepository) { - const data = await repository.loadIncrements(); + const increments = await repository.loadIncrements(); return new SyncQueue( - new Map(data?.increments?.map((increment) => [increment.id, increment])), + new Map(increments?.map((increment) => [increment.id, increment])), repository, ); } @@ -64,7 +64,7 @@ export class SyncQueue { public persist = throttle( async () => { try { - await this.repository.saveIncrements({ increments: this.getAll() }); + await this.repository.saveIncrements(this.getAll()); } catch (e) { console.error("Failed to persist the sync queue:", e); } @@ -72,4 +72,4 @@ export class SyncQueue { 1000, { leading: false, trailing: true }, ); -} \ No newline at end of file +} diff --git a/packages/excalidraw/sync/server.ts b/packages/excalidraw/sync/server.ts index ead827543b..5661fc3a73 100644 --- a/packages/excalidraw/sync/server.ts +++ b/packages/excalidraw/sync/server.ts @@ -3,14 +3,13 @@ import { Utils } from "./utils"; import type { IncrementsRepository, - CLIENT_INCREMENT, CLIENT_MESSAGE, PULL_PAYLOAD, PUSH_PAYLOAD, - RELAY_PAYLOAD, SERVER_MESSAGE, SERVER_INCREMENT, CLIENT_MESSAGE_RAW, + CHUNK_INFO, } from "./protocol"; // CFDO: message could be binary (cbor, protobuf, etc.) @@ -22,12 +21,13 @@ export class ExcalidrawSyncServer { private readonly lock: AsyncLock = new AsyncLock(); private readonly sessions: Set = new Set(); private readonly chunks = new Map< - CLIENT_MESSAGE_RAW["chunkInfo"]["id"], - Map + CHUNK_INFO["id"], + Map >(); constructor(private readonly incrementsRepository: IncrementsRepository) {} + // CFDO: should send a message about collaborators (no collaborators => no need to send ephemerals) public onConnect(client: WebSocket) { this.sessions.add(client); } @@ -50,9 +50,9 @@ export class ExcalidrawSyncServer { const { type, payload, chunkInfo } = parsedMessage; - // if there are more than 1 chunks, process them first - if (chunkInfo.count > 1) { - return this.processChunks(client, parsedMessage); + // if there is chunkInfo, there are more than 1 chunks => process them first + if (chunkInfo) { + return this.processChunks(client, { type, payload, chunkInfo }); } const [parsedPayload, parsePayloadError] = Utils.try< @@ -65,8 +65,8 @@ export class ExcalidrawSyncServer { } switch (type) { - case "relay": - return this.relay(client, parsedPayload as RELAY_PAYLOAD); + // case "relay": + // return this.relay(client, parsedPayload as RELAY_PAYLOAD); case "pull": return this.pull(client, parsedPayload as PULL_PAYLOAD); case "push": @@ -83,12 +83,15 @@ export class ExcalidrawSyncServer { /** * Process chunks in case the client-side payload would overflow the 1MiB durable object WS message limit. */ - private processChunks(client: WebSocket, message: CLIENT_MESSAGE_RAW) { + private processChunks( + client: WebSocket, + message: CLIENT_MESSAGE_RAW & { chunkInfo: CHUNK_INFO }, + ) { let shouldCleanupchunks = true; const { type, payload, - chunkInfo: { id, order, count }, + chunkInfo: { id, position, count }, } = message; try { @@ -104,7 +107,7 @@ export class ExcalidrawSyncServer { } // set the buffer by order - chunks.set(order, payload); + chunks.set(position, payload); if (chunks.size !== count) { // we don't have all the chunks, don't cleanup just yet! @@ -120,8 +123,6 @@ export class ExcalidrawSyncServer { const rawMessage = JSON.stringify({ type, payload: restoredPayload, - // id is irrelevant if we are sending just one chunk - chunkInfo: { id: "", order: 0, count: 1 }, } as CLIENT_MESSAGE_RAW); // process the message @@ -136,18 +137,18 @@ export class ExcalidrawSyncServer { } } - private relay( - client: WebSocket, - payload: { increments: Array } | RELAY_PAYLOAD, - ) { - return this.broadcast( - { - type: "relayed", - payload, - }, - client, - ); - } + // private relay( + // client: WebSocket, + // payload: { increments: Array } | RELAY_PAYLOAD, + // ) { + // return this.broadcast( + // { + // type: "relayed", + // payload, + // }, + // client, + // ); + // } private pull(client: WebSocket, payload: PULL_PAYLOAD) { // CFDO: test for invalid payload @@ -170,7 +171,7 @@ export class ExcalidrawSyncServer { if (versionĪ” > 0) { increments.push( - ...this.incrementsRepository.getSinceVersion( + ...this.incrementsRepository.getAllSinceVersion( lastAcknowledgedClientVersion, ), ); @@ -184,38 +185,29 @@ export class ExcalidrawSyncServer { }); } - private push(client: WebSocket, payload: PUSH_PAYLOAD) { - const { type, increments } = payload; + private push(client: WebSocket, increment: PUSH_PAYLOAD) { + // CFDO: try to apply the increments to the snapshot + const [acknowledged, error] = Utils.try(() => + this.incrementsRepository.save(increment), + ); - switch (type) { - case "ephemeral": - return this.relay(client, { increments }); - case "durable": - // CFDO: try to apply the increments to the snapshot - const [acknowledged, error] = Utils.try(() => - this.incrementsRepository.saveAll(increments), - ); - - if (error) { - // everything should be automatically rolled-back -> double-check - return this.send(client, { - type: "rejected", - payload: { - message: error.message, - increments, - }, - }); - } - - return this.broadcast({ - type: "acknowledged", - payload: { - increments: acknowledged, - }, - }); - default: - console.error(`Unknown push message type: ${type}`); + if (error || !acknowledged) { + // everything should be automatically rolled-back -> double-check + return this.send(client, { + type: "rejected", + payload: { + message: error ? error.message : "Coudn't persist the increment", + increments: [increment], + }, + }); } + + return this.broadcast({ + type: "acknowledged", + payload: { + increments: [acknowledged], + }, + }); } private send(client: WebSocket, message: SERVER_MESSAGE) {