mirror of
https://github.com/excalidraw/excalidraw.git
synced 2025-05-03 10:00:07 -04:00
Decouple do from package
This commit is contained in:
parent
7b72406824
commit
f00069be68
14 changed files with 9 additions and 6604 deletions
|
@ -1,154 +0,0 @@
|
|||
import type {
|
||||
DeltasRepository,
|
||||
CLIENT_DELTA,
|
||||
SERVER_DELTA,
|
||||
SERVER_DELTA_STORAGE,
|
||||
} from "../sync/protocol";
|
||||
import { Network } from "../sync/utils";
|
||||
|
||||
// CFDO II: add senderId, possibly roomId as well
|
||||
export class DurableDeltasRepository implements DeltasRepository {
|
||||
// there is a 2MB row limit, hence working with max payload size of 1.5 MB
|
||||
// and leaving a ~500kB buffer for other row metadata
|
||||
private static readonly MAX_PAYLOAD_SIZE = 1_500_000;
|
||||
|
||||
constructor(private storage: DurableObjectStorage) {
|
||||
// #region DEV ONLY
|
||||
// this.storage.sql.exec(`DROP TABLE IF EXISTS deltas;`);
|
||||
// #endregion
|
||||
|
||||
this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS deltas(
|
||||
id TEXT NOT NULL,
|
||||
version INTEGER NOT NULL,
|
||||
position INTEGER NOT NULL,
|
||||
payload BLOB NOT NULL,
|
||||
createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
||||
PRIMARY KEY (id, version, position)
|
||||
);`);
|
||||
}
|
||||
|
||||
public save(delta: CLIENT_DELTA): SERVER_DELTA | null {
|
||||
return this.storage.transactionSync(() => {
|
||||
const existingDelta = this.getById(delta.id);
|
||||
|
||||
// don't perist the same delta twice
|
||||
if (existingDelta) {
|
||||
return existingDelta;
|
||||
}
|
||||
|
||||
try {
|
||||
const payloadBuffer = Network.toBinary(delta);
|
||||
const payloadSize = payloadBuffer.byteLength;
|
||||
const nextVersion = this.getLastVersion() + 1;
|
||||
const chunksCount = Math.ceil(
|
||||
payloadSize / DurableDeltasRepository.MAX_PAYLOAD_SIZE,
|
||||
);
|
||||
|
||||
for (let position = 0; position < chunksCount; position++) {
|
||||
const start = position * DurableDeltasRepository.MAX_PAYLOAD_SIZE;
|
||||
const end = start + DurableDeltasRepository.MAX_PAYLOAD_SIZE;
|
||||
const chunkedPayload = payloadBuffer.subarray(start, end);
|
||||
|
||||
this.storage.sql.exec(
|
||||
`INSERT INTO deltas (id, version, position, payload) VALUES (?, ?, ?, ?);`,
|
||||
delta.id,
|
||||
nextVersion,
|
||||
position,
|
||||
chunkedPayload,
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
// check if the delta has been already acknowledged
|
||||
// in case client for some reason did not receive acknowledgement
|
||||
// and reconnected while the we still have the delta in the worker
|
||||
// otherwise the client is doomed to full a restore
|
||||
if (e instanceof Error && e.message.includes("SQLITE_CONSTRAINT")) {
|
||||
// continue;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
const acknowledged = this.getById(delta.id);
|
||||
return acknowledged;
|
||||
});
|
||||
}
|
||||
|
||||
// CFDO: for versioning we need deletions, but not for the "snapshot" update;
|
||||
public getAllSinceVersion(version: number): Array<SERVER_DELTA> {
|
||||
const deltas = this.storage.sql
|
||||
.exec<SERVER_DELTA_STORAGE>(
|
||||
`SELECT id, payload, version, position FROM deltas WHERE version > (?) ORDER BY version, position, createdAt ASC;`,
|
||||
version,
|
||||
)
|
||||
.toArray();
|
||||
|
||||
return this.restorePayloadChunks(deltas);
|
||||
}
|
||||
|
||||
public getLastVersion(): number {
|
||||
// CFDO: might be in memory to reduce number of rows read (or position on version at least, if btree affect rows read)
|
||||
const result = this.storage.sql
|
||||
.exec(`SELECT MAX(version) FROM deltas;`)
|
||||
.one();
|
||||
|
||||
return result ? Number(result["MAX(version)"]) : 0;
|
||||
}
|
||||
|
||||
public getById(id: string): SERVER_DELTA | null {
|
||||
const deltas = this.storage.sql
|
||||
.exec<SERVER_DELTA_STORAGE>(
|
||||
`SELECT id, payload, version, position FROM deltas WHERE id = (?) ORDER BY position ASC`,
|
||||
id,
|
||||
)
|
||||
.toArray();
|
||||
|
||||
if (!deltas.length) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const restoredDeltas = this.restorePayloadChunks(deltas);
|
||||
|
||||
if (restoredDeltas.length !== 1) {
|
||||
throw new Error(
|
||||
`Expected exactly one restored delta, but received "${restoredDeltas.length}".`,
|
||||
);
|
||||
}
|
||||
|
||||
return restoredDeltas[0];
|
||||
}
|
||||
|
||||
private restorePayloadChunks(
|
||||
deltas: Array<SERVER_DELTA_STORAGE>,
|
||||
): Array<SERVER_DELTA> {
|
||||
return Array.from(
|
||||
deltas
|
||||
.reduce((acc, curr) => {
|
||||
const delta = acc.get(curr.version);
|
||||
|
||||
if (delta) {
|
||||
const currentPayload = new Uint8Array(curr.payload);
|
||||
acc.set(curr.version, {
|
||||
...delta,
|
||||
// glueing the chunks payload back
|
||||
payload: Uint8Array.from([...delta.payload, ...currentPayload]),
|
||||
});
|
||||
} else {
|
||||
// let's not unnecessarily expose more props than these (i.e. position)
|
||||
acc.set(curr.version, {
|
||||
id: curr.id,
|
||||
version: curr.version,
|
||||
payload: new Uint8Array(curr.payload),
|
||||
});
|
||||
}
|
||||
|
||||
return acc;
|
||||
// using Uint8Array instead of ArrayBuffer, as it has nicer methods
|
||||
}, new Map<number, Omit<SERVER_DELTA_STORAGE, "payload" | "position"> & { payload: Uint8Array }>())
|
||||
.values(),
|
||||
).map((delta) => ({
|
||||
...delta,
|
||||
payload: Network.fromBinary(delta.payload),
|
||||
}));
|
||||
}
|
||||
}
|
|
@ -1,60 +0,0 @@
|
|||
import { DurableObject } from "cloudflare:workers";
|
||||
import { DurableDeltasRepository } from "./repository";
|
||||
import { ExcalidrawSyncServer } from "../sync/server";
|
||||
|
||||
/**
|
||||
* Durable Object impl. of Excalidraw room.
|
||||
*/
|
||||
export class DurableRoom extends DurableObject {
|
||||
private roomId: string | null = null;
|
||||
private sync: ExcalidrawSyncServer;
|
||||
|
||||
constructor(ctx: DurableObjectState, env: Env) {
|
||||
super(ctx, env);
|
||||
|
||||
this.ctx.blockConcurrencyWhile(async () => {
|
||||
this.roomId = (await this.ctx.storage.get("roomId")) || null;
|
||||
});
|
||||
|
||||
const repository = new DurableDeltasRepository(ctx.storage);
|
||||
this.sync = new ExcalidrawSyncServer(repository);
|
||||
|
||||
// in case it hibernates, let's get take active connections
|
||||
for (const ws of this.ctx.getWebSockets()) {
|
||||
this.sync.onConnect(ws);
|
||||
}
|
||||
}
|
||||
|
||||
public fetch = async (request: Request): Promise<Response> =>
|
||||
this.connect(request);
|
||||
|
||||
public webSocketMessage = (client: WebSocket, message: ArrayBuffer) =>
|
||||
this.sync.onMessage(client, message);
|
||||
|
||||
public webSocketClose = (ws: WebSocket) => this.sync.onDisconnect(ws);
|
||||
|
||||
private connect(request: Request) {
|
||||
if (!this.roomId) {
|
||||
const roomId = new URL(request.url).searchParams.get("roomId");
|
||||
|
||||
if (!roomId) {
|
||||
return new Response(null, { status: 400 /* bad request */ });
|
||||
}
|
||||
|
||||
this.ctx.blockConcurrencyWhile(async () => {
|
||||
await this.ctx.storage.put("roomId", roomId);
|
||||
this.roomId = roomId;
|
||||
});
|
||||
}
|
||||
|
||||
const { 0: client, 1: server } = new WebSocketPair();
|
||||
|
||||
this.ctx.acceptWebSocket(client);
|
||||
this.sync.onConnect(client);
|
||||
|
||||
return new Response(null, {
|
||||
status: 101 /* switching protocols */,
|
||||
webSocket: server,
|
||||
});
|
||||
}
|
||||
}
|
|
@ -1,38 +0,0 @@
|
|||
export { DurableRoom } from "./room";
|
||||
|
||||
/**
|
||||
* Worker relay for Durable Room.
|
||||
*/
|
||||
export default {
|
||||
async fetch(
|
||||
request: Request,
|
||||
env: Env,
|
||||
ctx: ExecutionContext,
|
||||
): Promise<Response> {
|
||||
// CFDO: only auth user should reach this
|
||||
const upgrade = request.headers.get("upgrade");
|
||||
if (!upgrade || upgrade !== "websocket") {
|
||||
return new Response(null, { status: 426 /* upgrade required */ });
|
||||
}
|
||||
|
||||
if (request.method !== "GET") {
|
||||
return new Response(null, { status: 405 /* method not allowed */ });
|
||||
}
|
||||
|
||||
const url = new URL(request.url);
|
||||
if (url.pathname !== "/connect") {
|
||||
return new Response(null, { status: 403 /* forbidden */ });
|
||||
}
|
||||
|
||||
// CFDO: double check that the scene exists
|
||||
const roomId = url.searchParams.get("roomId");
|
||||
if (!roomId) {
|
||||
return new Response(null, { status: 400 /* bad request */ });
|
||||
}
|
||||
|
||||
const id: DurableObjectId = env.DURABLE_ROOM.idFromName(roomId);
|
||||
const room = env.DURABLE_ROOM.get(id);
|
||||
|
||||
return room.fetch(request);
|
||||
},
|
||||
};
|
|
@ -62,8 +62,6 @@
|
|||
"@excalidraw/random-username": "1.1.0",
|
||||
"@radix-ui/react-popover": "1.0.3",
|
||||
"@radix-ui/react-tabs": "1.0.2",
|
||||
"@types/async-lock": "^1.4.2",
|
||||
"async-lock": "^1.4.1",
|
||||
"browser-fs-access": "0.29.1",
|
||||
"canvas-roundrect-polyfill": "0.0.1",
|
||||
"clsx": "1.1.1",
|
||||
|
@ -74,9 +72,7 @@
|
|||
"image-blob-reduce": "3.0.1",
|
||||
"jotai": "2.11.0",
|
||||
"jotai-scope": "0.7.2",
|
||||
"lodash.debounce": "4.0.8",
|
||||
"lodash.throttle": "4.1.1",
|
||||
"msgpack-lite": "0.1.26",
|
||||
"nanoid": "3.3.3",
|
||||
"open-color": "1.9.1",
|
||||
"pako": "1.0.11",
|
||||
|
@ -87,7 +83,6 @@
|
|||
"png-chunks-extract": "1.0.0",
|
||||
"points-on-curve": "1.0.1",
|
||||
"pwacompat": "2.0.17",
|
||||
"reconnecting-websocket": "4.4.0",
|
||||
"roughjs": "4.6.4",
|
||||
"sass": "1.51.0",
|
||||
"tunnel-rat": "0.1.2"
|
||||
|
@ -101,13 +96,10 @@
|
|||
"@babel/preset-env": "7.24.5",
|
||||
"@babel/preset-react": "7.24.1",
|
||||
"@babel/preset-typescript": "7.24.1",
|
||||
"@cloudflare/workers-types": "4.20241112.0",
|
||||
"@size-limit/preset-big-lib": "9.0.0",
|
||||
"@testing-library/dom": "10.4.0",
|
||||
"@testing-library/jest-dom": "5.16.2",
|
||||
"@testing-library/react": "16.0.0",
|
||||
"@types/async-lock": "^1.4.2",
|
||||
"@types/msgpack-lite": "0.1.11",
|
||||
"@types/pako": "1.0.3",
|
||||
"@types/pica": "5.1.3",
|
||||
"@types/resize-observer-browser": "0.1.7",
|
||||
|
@ -132,8 +124,7 @@
|
|||
"size-limit": "9.0.0",
|
||||
"style-loader": "3.3.3",
|
||||
"ts-loader": "9.3.1",
|
||||
"typescript": "4.9.4",
|
||||
"wrangler": "^3.60.3"
|
||||
"typescript": "4.9.4"
|
||||
},
|
||||
"bugs": "https://github.com/excalidraw/excalidraw/issues",
|
||||
"homepage": "https://github.com/excalidraw/excalidraw/tree/master/packages/excalidraw",
|
||||
|
@ -143,9 +134,6 @@
|
|||
"pack": "yarn build:umd && yarn pack",
|
||||
"start": "node ../../scripts/buildExample.mjs && vite",
|
||||
"build:example": "node ../../scripts/buildExample.mjs",
|
||||
"size": "yarn build:umd && size-limit",
|
||||
"sync:deploy": "wrangler deploy",
|
||||
"sync:dev": "wrangler dev",
|
||||
"sync:typegen": "wrangler types --experimental-include-runtime=\"./worker-runtime.d.ts\""
|
||||
"size": "yarn build:umd && size-limit"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import type {
|
|||
OrderedExcalidrawElement,
|
||||
SceneElementsMap,
|
||||
} from "./element/types";
|
||||
import type { SERVER_DELTA } from "./sync/protocol";
|
||||
import { arrayToMap, assertNever } from "./utils";
|
||||
import { hashElementsVersion } from "./element";
|
||||
import { syncMovedIndices } from "./fractionalIndex";
|
||||
|
@ -453,7 +452,7 @@ export class StoreDelta {
|
|||
public static load({
|
||||
id,
|
||||
elements: { added, removed, updated },
|
||||
}: SERVER_DELTA["payload"]) {
|
||||
}: DTO<StoreDelta>) {
|
||||
const elements = ElementsDelta.create(added, removed, updated, {
|
||||
shouldRedistribute: false,
|
||||
});
|
||||
|
|
|
@ -1,548 +0,0 @@
|
|||
import throttle from "lodash.throttle";
|
||||
import ReconnectingWebSocket, {
|
||||
type Event,
|
||||
type CloseEvent,
|
||||
} from "reconnecting-websocket";
|
||||
import { Network, Utils } from "./utils";
|
||||
import {
|
||||
LocalDeltasQueue,
|
||||
type MetadataRepository,
|
||||
type DeltasRepository,
|
||||
} from "./queue";
|
||||
import { StoreAction, StoreDelta } from "../store";
|
||||
import type { StoreChange } from "../store";
|
||||
import type { ExcalidrawImperativeAPI } from "../types";
|
||||
import type { ExcalidrawElement, SceneElementsMap } from "../element/types";
|
||||
import type {
|
||||
SERVER_DELTA,
|
||||
CLIENT_CHANGE,
|
||||
SERVER_MESSAGE,
|
||||
CLIENT_MESSAGE_BINARY,
|
||||
} from "./protocol";
|
||||
import { debounce } from "../utils";
|
||||
import { randomId } from "../random";
|
||||
import { orderByFractionalIndex } from "../fractionalIndex";
|
||||
import { ENV } from "../constants";
|
||||
|
||||
class SocketMessage implements CLIENT_MESSAGE_BINARY {
|
||||
constructor(
|
||||
public readonly type: "relay" | "pull" | "push",
|
||||
public readonly payload: Uint8Array,
|
||||
public readonly chunkInfo?: {
|
||||
id: string;
|
||||
position: number;
|
||||
count: number;
|
||||
},
|
||||
) {}
|
||||
}
|
||||
|
||||
class SocketClient {
|
||||
// Max size for outgoing messages is 1MiB (due to CFDO limits),
|
||||
// thus working with a slighter smaller limit of 800 kB (leaving 224kB for metadata)
|
||||
private static readonly MAX_MESSAGE_SIZE = 800_000;
|
||||
|
||||
private isOffline = true;
|
||||
private socket: ReconnectingWebSocket | null = null;
|
||||
|
||||
public get isDisconnected() {
|
||||
return !this.socket;
|
||||
}
|
||||
|
||||
constructor(
|
||||
private readonly host: string,
|
||||
private readonly roomId: String,
|
||||
private readonly handlers: {
|
||||
onOpen: (event: Event) => void;
|
||||
onOnline: () => void;
|
||||
onMessage: (message: SERVER_MESSAGE) => void;
|
||||
},
|
||||
) {}
|
||||
|
||||
private onOnline = () => {
|
||||
this.isOffline = false;
|
||||
this.handlers.onOnline();
|
||||
};
|
||||
|
||||
private onOffline = () => {
|
||||
this.isOffline = true;
|
||||
};
|
||||
|
||||
public connect = throttle(
|
||||
() => {
|
||||
if (!this.isDisconnected && !this.isOffline) {
|
||||
return;
|
||||
}
|
||||
|
||||
window.addEventListener("online", this.onOnline);
|
||||
window.addEventListener("offline", this.onOffline);
|
||||
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug(`Connecting to the room "${this.roomId}"...`);
|
||||
this.socket = new ReconnectingWebSocket(
|
||||
`${this.host}/connect?roomId=${this.roomId}`,
|
||||
[],
|
||||
{
|
||||
WebSocket: undefined, // WebSocket constructor, if none provided, defaults to global WebSocket
|
||||
maxReconnectionDelay: 10000, // max delay in ms between reconnections
|
||||
minReconnectionDelay: 1000, // min delay in ms between reconnections
|
||||
reconnectionDelayGrowFactor: 1.3, // how fast the reconnection delay grows
|
||||
minUptime: 5000, // min time in ms to consider connection as stable
|
||||
connectionTimeout: 4000, // retry connect if not connected after this time, in ms
|
||||
maxRetries: Infinity, // maximum number of retries
|
||||
maxEnqueuedMessages: 0, // maximum number of messages to buffer until reconnection
|
||||
startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect
|
||||
debug: false, // enables debug output,
|
||||
},
|
||||
);
|
||||
this.socket.addEventListener("message", this.onMessage);
|
||||
this.socket.addEventListener("open", this.onOpen);
|
||||
this.socket.addEventListener("close", this.onClose);
|
||||
this.socket.addEventListener("error", this.onError);
|
||||
},
|
||||
1000,
|
||||
{ leading: true, trailing: false },
|
||||
);
|
||||
|
||||
public disconnect() {
|
||||
if (this.isDisconnected) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
window.removeEventListener("online", this.onOnline);
|
||||
window.removeEventListener("offline", this.onOffline);
|
||||
|
||||
this.socket?.removeEventListener("message", this.onMessage);
|
||||
this.socket?.removeEventListener("open", this.onOpen);
|
||||
this.socket?.removeEventListener("close", this.onClose);
|
||||
this.socket?.removeEventListener("error", this.onError);
|
||||
this.socket?.close();
|
||||
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug(`Disconnected from the room "${this.roomId}".`);
|
||||
} finally {
|
||||
this.socket = null;
|
||||
}
|
||||
}
|
||||
|
||||
public send(message: {
|
||||
type: "relay" | "pull" | "push";
|
||||
payload: Record<string, unknown>;
|
||||
}): void {
|
||||
if (this.isOffline) {
|
||||
// connection opened, don't let the WS buffer the messages,
|
||||
// as we do explicitly buffer unacknowledged deltas
|
||||
return;
|
||||
}
|
||||
|
||||
if (this.isDisconnected) {
|
||||
this.connect();
|
||||
return;
|
||||
}
|
||||
|
||||
const { type, payload } = message;
|
||||
|
||||
const payloadBuffer = Network.toBinary(payload);
|
||||
const payloadSize = payloadBuffer.byteLength;
|
||||
|
||||
if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug("send", message, payloadSize);
|
||||
}
|
||||
|
||||
if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) {
|
||||
const message = new SocketMessage(type, payloadBuffer);
|
||||
return this.sendMessage(message);
|
||||
}
|
||||
|
||||
const chunkId = randomId();
|
||||
const chunkSize = SocketClient.MAX_MESSAGE_SIZE;
|
||||
const chunksCount = Math.ceil(payloadSize / chunkSize);
|
||||
|
||||
for (let position = 0; position < chunksCount; position++) {
|
||||
const start = position * chunkSize;
|
||||
const end = start + chunkSize;
|
||||
const chunkedPayload = payloadBuffer.subarray(start, end);
|
||||
const message = new SocketMessage(type, chunkedPayload, {
|
||||
id: chunkId,
|
||||
position,
|
||||
count: chunksCount,
|
||||
});
|
||||
|
||||
this.sendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
private onMessage = (event: MessageEvent) => {
|
||||
this.receiveMessage(event.data).then((message) => {
|
||||
if (!message) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (import.meta.env.DEV || import.meta.env.MODE === ENV.TEST) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug("onMessage", message);
|
||||
}
|
||||
|
||||
this.handlers.onMessage(message);
|
||||
});
|
||||
};
|
||||
|
||||
private onOpen = (event: Event) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug(`Connection to the room "${this.roomId}" opened.`);
|
||||
this.isOffline = false;
|
||||
this.handlers.onOpen(event);
|
||||
};
|
||||
|
||||
private onClose = (event: CloseEvent) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.debug(`Connection to the room "${this.roomId}" closed.`, event);
|
||||
};
|
||||
|
||||
private onError = (event: Event) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.error(
|
||||
`Connection to the room "${this.roomId}" returned an error.`,
|
||||
event,
|
||||
);
|
||||
};
|
||||
|
||||
private sendMessage = (message: CLIENT_MESSAGE_BINARY) => {
|
||||
this.socket?.send(Network.encodeClientMessage(message));
|
||||
};
|
||||
|
||||
// CFDO: should be (runtime) type-safe
|
||||
private async receiveMessage(
|
||||
message: Blob,
|
||||
): Promise<SERVER_MESSAGE | undefined> {
|
||||
const arrayBuffer = await message.arrayBuffer();
|
||||
const uint8Array = new Uint8Array(arrayBuffer);
|
||||
|
||||
const [decodedMessage, decodingError] = Utils.try<SERVER_MESSAGE>(() =>
|
||||
Network.fromBinary(uint8Array),
|
||||
);
|
||||
|
||||
if (decodingError) {
|
||||
console.error("Failed to decode message:", message);
|
||||
return;
|
||||
}
|
||||
|
||||
return decodedMessage;
|
||||
}
|
||||
}
|
||||
|
||||
interface AcknowledgedDelta {
|
||||
delta: StoreDelta;
|
||||
version: number;
|
||||
}
|
||||
|
||||
export class SyncClient {
|
||||
private static readonly HOST_URL = import.meta.env.DEV
|
||||
? "ws://localhost:8787"
|
||||
: "https://excalidraw-sync.marcel-529.workers.dev";
|
||||
|
||||
private static readonly ROOM_ID = import.meta.env.DEV
|
||||
? "test_room_x"
|
||||
: "test_room_prod";
|
||||
|
||||
private readonly api: ExcalidrawImperativeAPI;
|
||||
private readonly localDeltas: LocalDeltasQueue;
|
||||
private readonly metadata: MetadataRepository;
|
||||
private readonly client: SocketClient;
|
||||
|
||||
private relayedElementsVersionsCache = new Map<
|
||||
string,
|
||||
ExcalidrawElement["version"]
|
||||
>();
|
||||
|
||||
// #region ACKNOWLEDGED DELTAS & METADATA
|
||||
// CFDO II: shouldn't be stateful, only request / response
|
||||
private readonly acknowledgedDeltasMap: Map<string, AcknowledgedDelta> =
|
||||
new Map();
|
||||
|
||||
public get acknowledgedDeltas() {
|
||||
return Array.from(this.acknowledgedDeltasMap.values())
|
||||
.sort((a, b) => (a.version < b.version ? -1 : 1))
|
||||
.map((x) => x.delta);
|
||||
}
|
||||
|
||||
private _lastAcknowledgedVersion = 0;
|
||||
|
||||
private get lastAcknowledgedVersion() {
|
||||
return this._lastAcknowledgedVersion;
|
||||
}
|
||||
|
||||
private set lastAcknowledgedVersion(version: number) {
|
||||
this._lastAcknowledgedVersion = version;
|
||||
this.metadata.saveMetadata({ lastAcknowledgedVersion: version });
|
||||
}
|
||||
// #endregion
|
||||
|
||||
private constructor(
|
||||
api: ExcalidrawImperativeAPI,
|
||||
repository: MetadataRepository,
|
||||
queue: LocalDeltasQueue,
|
||||
options: { host: string; roomId: string; lastAcknowledgedVersion: number },
|
||||
) {
|
||||
this.api = api;
|
||||
this.metadata = repository;
|
||||
this.localDeltas = queue;
|
||||
this.lastAcknowledgedVersion = options.lastAcknowledgedVersion;
|
||||
this.client = new SocketClient(options.host, options.roomId, {
|
||||
onOpen: this.onOpen,
|
||||
onOnline: this.onOnline,
|
||||
onMessage: this.onMessage,
|
||||
});
|
||||
}
|
||||
|
||||
// #region SYNC_CLIENT FACTORY
|
||||
public static async create(
|
||||
api: ExcalidrawImperativeAPI,
|
||||
repository: DeltasRepository & MetadataRepository,
|
||||
) {
|
||||
const queue = await LocalDeltasQueue.create(repository);
|
||||
// CFDO: temporary for custom roomId (though E+ will be similar)
|
||||
const roomId = window.location.pathname.split("/").at(-1);
|
||||
|
||||
return new SyncClient(api, repository, queue, {
|
||||
host: SyncClient.HOST_URL,
|
||||
roomId: roomId ?? SyncClient.ROOM_ID,
|
||||
// CFDO II: temporary, so that all deltas are loaded and applied on init
|
||||
lastAcknowledgedVersion: 0,
|
||||
});
|
||||
}
|
||||
// #endregion
|
||||
|
||||
// #region PUBLIC API METHODS
|
||||
public connect() {
|
||||
this.client.connect();
|
||||
}
|
||||
|
||||
public disconnect() {
|
||||
this.client.disconnect();
|
||||
this.relayedElementsVersionsCache.clear();
|
||||
}
|
||||
|
||||
public pull(sinceVersion?: number): void {
|
||||
this.client.send({
|
||||
type: "pull",
|
||||
payload: {
|
||||
lastAcknowledgedVersion: sinceVersion ?? this.lastAcknowledgedVersion,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
public push(delta?: StoreDelta): void {
|
||||
if (delta) {
|
||||
this.localDeltas.add(delta);
|
||||
}
|
||||
|
||||
// re-send all already queued deltas
|
||||
for (const delta of this.localDeltas.getAll()) {
|
||||
this.client.send({
|
||||
type: "push",
|
||||
payload: {
|
||||
...delta,
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// CFDO: should be throttled! 16ms (60 fps) for live scenes, not needed at all for single player
|
||||
public relay(change: StoreChange): void {
|
||||
if (this.client.isDisconnected) {
|
||||
// don't reconnect if we're explicitly disconnected
|
||||
// otherwise versioning slider would trigger sync on every slider step
|
||||
return;
|
||||
}
|
||||
|
||||
let shouldRelay = false;
|
||||
|
||||
for (const [id, element] of Object.entries(change.elements)) {
|
||||
const cachedElementVersion = this.relayedElementsVersionsCache.get(id);
|
||||
|
||||
if (!cachedElementVersion || cachedElementVersion < element.version) {
|
||||
this.relayedElementsVersionsCache.set(id, element.version);
|
||||
|
||||
if (!shouldRelay) {
|
||||
// it's enough that a single element is not cached or is outdated in cache
|
||||
// to relay the whole change, otherwise we skip the relay as we've already received this change
|
||||
shouldRelay = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!shouldRelay) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.client.send({
|
||||
type: "relay",
|
||||
payload: { ...change },
|
||||
});
|
||||
}
|
||||
// #endregion
|
||||
|
||||
// #region PRIVATE SOCKET MESSAGE HANDLERS
|
||||
private onOpen = (event: Event) => {
|
||||
// CFDO II: hack to pull everything for on init
|
||||
this.pull(0);
|
||||
this.push();
|
||||
};
|
||||
|
||||
private onOnline = () => {
|
||||
// perform incremental sync
|
||||
this.pull();
|
||||
this.push();
|
||||
};
|
||||
|
||||
private onMessage = (serverMessage: SERVER_MESSAGE) => {
|
||||
const { type, payload } = serverMessage;
|
||||
|
||||
switch (type) {
|
||||
case "relayed":
|
||||
return this.handleRelayed(payload);
|
||||
case "acknowledged":
|
||||
return this.handleAcknowledged(payload);
|
||||
// case "rejected":
|
||||
// return this.handleRejected(payload);
|
||||
default:
|
||||
console.error("Unknown message type:", type);
|
||||
}
|
||||
};
|
||||
|
||||
private handleRelayed = (payload: CLIENT_CHANGE) => {
|
||||
// CFDO I: retrieve the map already
|
||||
const nextElements = new Map(
|
||||
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
|
||||
) as SceneElementsMap;
|
||||
|
||||
try {
|
||||
const { elements: relayedElements } = payload;
|
||||
|
||||
for (const [id, relayedElement] of Object.entries(relayedElements)) {
|
||||
const existingElement = nextElements.get(id);
|
||||
|
||||
if (
|
||||
!existingElement || // new element
|
||||
existingElement.version < relayedElement.version // updated element
|
||||
) {
|
||||
nextElements.set(id, relayedElement);
|
||||
this.relayedElementsVersionsCache.set(id, relayedElement.version);
|
||||
}
|
||||
}
|
||||
|
||||
const orderedElements = orderByFractionalIndex(
|
||||
Array.from(nextElements.values()),
|
||||
);
|
||||
|
||||
this.api.updateScene({
|
||||
elements: orderedElements,
|
||||
storeAction: StoreAction.UPDATE,
|
||||
});
|
||||
} catch (e) {
|
||||
console.error("Failed to apply relayed change:", e);
|
||||
}
|
||||
};
|
||||
|
||||
// CFDO: refactor by applying all operations to store, not to the elements
|
||||
private handleAcknowledged = (payload: { deltas: Array<SERVER_DELTA> }) => {
|
||||
let prevSnapshot = this.api.store.snapshot;
|
||||
|
||||
try {
|
||||
const remoteDeltas = Array.from(payload.deltas);
|
||||
const applicableDeltas: Array<StoreDelta> = [];
|
||||
const appState = this.api.getAppState();
|
||||
|
||||
let nextAcknowledgedVersion = this.lastAcknowledgedVersion;
|
||||
let nextElements = new Map(
|
||||
// CFDO: retrieve the map already
|
||||
this.api.getSceneElementsIncludingDeleted().map((el) => [el.id, el]),
|
||||
) as SceneElementsMap;
|
||||
|
||||
for (const { id, version, payload } of remoteDeltas) {
|
||||
// CFDO II: temporary to load all deltas on init
|
||||
this.acknowledgedDeltasMap.set(id, {
|
||||
delta: StoreDelta.load(payload),
|
||||
version,
|
||||
});
|
||||
|
||||
// we've already applied this delta!
|
||||
if (version <= nextAcknowledgedVersion) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// CFDO: strictly checking for out of order deltas; might be relaxed if it becomes a problem
|
||||
if (version !== nextAcknowledgedVersion + 1) {
|
||||
throw new Error(
|
||||
`Received out of order delta, expected "${
|
||||
nextAcknowledgedVersion + 1
|
||||
}", but received "${version}"`,
|
||||
);
|
||||
}
|
||||
|
||||
if (this.localDeltas.has(id)) {
|
||||
// local delta does not have to be applied again
|
||||
this.localDeltas.remove(id);
|
||||
} else {
|
||||
// this is a new remote delta, adding it to the list of applicable deltas
|
||||
const remoteDelta = StoreDelta.load(payload);
|
||||
applicableDeltas.push(remoteDelta);
|
||||
}
|
||||
|
||||
nextAcknowledgedVersion = version;
|
||||
}
|
||||
|
||||
// adding all yet unacknowledged local deltas
|
||||
const localDeltas = this.localDeltas.getAll();
|
||||
applicableDeltas.push(...localDeltas);
|
||||
|
||||
for (const delta of applicableDeltas) {
|
||||
[nextElements] = this.api.store.applyDeltaTo(
|
||||
delta,
|
||||
nextElements,
|
||||
appState,
|
||||
{
|
||||
triggerIncrement: false,
|
||||
updateSnapshot: true,
|
||||
},
|
||||
);
|
||||
|
||||
prevSnapshot = this.api.store.snapshot;
|
||||
}
|
||||
|
||||
const orderedElements = orderByFractionalIndex(
|
||||
Array.from(nextElements.values()),
|
||||
);
|
||||
|
||||
// CFDO: might need to restore first due to potentially stale delta versions
|
||||
this.api.updateScene({
|
||||
elements: orderedElements,
|
||||
// even though the snapshot should be up-to-date already,
|
||||
// still some more updates might be triggered,
|
||||
// i.e. as a result from syncing invalid indices
|
||||
storeAction: StoreAction.UPDATE,
|
||||
});
|
||||
|
||||
this.lastAcknowledgedVersion = nextAcknowledgedVersion;
|
||||
} catch (e) {
|
||||
console.error("Failed to apply acknowledged deltas:", e);
|
||||
// rollback to the previous snapshot, so that we don't end up in an incosistent state
|
||||
this.api.store.snapshot = prevSnapshot;
|
||||
// schedule another fresh pull in case of a failure
|
||||
this.schedulePull();
|
||||
}
|
||||
};
|
||||
|
||||
private handleRejected = (payload: {
|
||||
ids: Array<string>;
|
||||
message: Uint8Array;
|
||||
}) => {
|
||||
// handle rejected deltas
|
||||
console.error("Rejected message received:", payload);
|
||||
};
|
||||
|
||||
private schedulePull = debounce(() => this.pull(), 1000);
|
||||
// #endregion
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
import type { StoreChange, StoreDelta } from "../store";
|
||||
import type { DTO } from "../utility-types";
|
||||
|
||||
export type CLIENT_DELTA = DTO<StoreDelta>;
|
||||
export type CLIENT_CHANGE = DTO<StoreChange>;
|
||||
|
||||
export type RESTORE_PAYLOAD = {};
|
||||
export type RELAY_PAYLOAD = CLIENT_CHANGE;
|
||||
export type PUSH_PAYLOAD = CLIENT_DELTA;
|
||||
export type PULL_PAYLOAD = { lastAcknowledgedVersion: number };
|
||||
|
||||
export type CHUNK_INFO = {
|
||||
id: string;
|
||||
position: number;
|
||||
count: number;
|
||||
};
|
||||
|
||||
export type CLIENT_MESSAGE = (
|
||||
| { type: "restore"; payload: RESTORE_PAYLOAD }
|
||||
| { type: "relay"; payload: RELAY_PAYLOAD }
|
||||
| { type: "pull"; payload: PULL_PAYLOAD }
|
||||
| { type: "push"; payload: PUSH_PAYLOAD }
|
||||
) & { chunkInfo?: CHUNK_INFO };
|
||||
|
||||
export type CLIENT_MESSAGE_BINARY = {
|
||||
type: CLIENT_MESSAGE["type"];
|
||||
payload: Uint8Array;
|
||||
chunkInfo?: CHUNK_INFO;
|
||||
};
|
||||
|
||||
export type SERVER_DELTA = {
|
||||
id: CLIENT_DELTA["id"];
|
||||
version: number;
|
||||
payload: CLIENT_DELTA;
|
||||
};
|
||||
|
||||
export type SERVER_DELTA_STORAGE = {
|
||||
id: SERVER_DELTA["id"];
|
||||
version: SERVER_DELTA["version"];
|
||||
position: number;
|
||||
payload: ArrayBuffer;
|
||||
};
|
||||
|
||||
export type SERVER_MESSAGE =
|
||||
| {
|
||||
type: "relayed";
|
||||
payload: RELAY_PAYLOAD;
|
||||
}
|
||||
| { type: "acknowledged"; payload: { deltas: Array<SERVER_DELTA> } }
|
||||
| {
|
||||
type: "rejected";
|
||||
payload: { deltas: Array<CLIENT_DELTA>; message: string };
|
||||
}
|
||||
| { type: "restored"; payload: { elements: Array<ExcalidrawElement> } };
|
||||
|
||||
export interface DeltasRepository {
|
||||
save(delta: CLIENT_DELTA): SERVER_DELTA | null;
|
||||
getAllSinceVersion(version: number): Array<SERVER_DELTA>;
|
||||
getLastVersion(): number;
|
||||
}
|
||||
|
||||
// CFDO: should come from the shared types package (might need a bigger refactor)
|
||||
export type ExcalidrawElement = {
|
||||
id: string;
|
||||
type: any;
|
||||
version: number;
|
||||
[key: string]: any;
|
||||
};
|
|
@ -1,74 +0,0 @@
|
|||
import throttle from "lodash.throttle";
|
||||
import type { StoreDelta } from "../store";
|
||||
|
||||
export interface DeltasRepository {
|
||||
loadDeltas(): Promise<Array<StoreDelta> | null>;
|
||||
saveDeltas(params: StoreDelta[]): Promise<void>;
|
||||
}
|
||||
|
||||
export interface MetadataRepository {
|
||||
loadMetadata(): Promise<{ lastAcknowledgedVersion: number } | null>;
|
||||
saveMetadata(metadata: { lastAcknowledgedVersion: number }): Promise<void>;
|
||||
}
|
||||
|
||||
export class LocalDeltasQueue {
|
||||
private readonly queue: Map<string, StoreDelta>;
|
||||
private readonly repository: DeltasRepository;
|
||||
|
||||
private constructor(
|
||||
queue: Map<string, StoreDelta> = new Map(),
|
||||
repository: DeltasRepository,
|
||||
) {
|
||||
this.queue = queue;
|
||||
this.repository = repository;
|
||||
}
|
||||
|
||||
public static async create(repository: DeltasRepository) {
|
||||
const deltas = await repository.loadDeltas();
|
||||
|
||||
return new LocalDeltasQueue(
|
||||
new Map(deltas?.map((delta) => [delta.id, delta])),
|
||||
repository,
|
||||
);
|
||||
}
|
||||
|
||||
public getAll() {
|
||||
return Array.from(this.queue.values());
|
||||
}
|
||||
|
||||
public get(id: StoreDelta["id"]) {
|
||||
return this.queue.get(id);
|
||||
}
|
||||
|
||||
public has(id: StoreDelta["id"]) {
|
||||
return this.queue.has(id);
|
||||
}
|
||||
|
||||
public add(...deltas: StoreDelta[]) {
|
||||
for (const delta of deltas) {
|
||||
this.queue.set(delta.id, delta);
|
||||
}
|
||||
|
||||
this.persist();
|
||||
}
|
||||
|
||||
public remove(...ids: StoreDelta["id"][]) {
|
||||
for (const id of ids) {
|
||||
this.queue.delete(id);
|
||||
}
|
||||
|
||||
this.persist();
|
||||
}
|
||||
|
||||
public persist = throttle(
|
||||
async () => {
|
||||
try {
|
||||
await this.repository.saveDeltas(this.getAll());
|
||||
} catch (e) {
|
||||
console.error("Failed to persist the sync queue:", e);
|
||||
}
|
||||
},
|
||||
1000,
|
||||
{ leading: false, trailing: true },
|
||||
);
|
||||
}
|
|
@ -1,308 +0,0 @@
|
|||
import AsyncLock from "async-lock";
|
||||
import { Network, Utils } from "./utils";
|
||||
|
||||
import type {
|
||||
DeltasRepository,
|
||||
PULL_PAYLOAD,
|
||||
PUSH_PAYLOAD,
|
||||
SERVER_MESSAGE,
|
||||
SERVER_DELTA,
|
||||
CHUNK_INFO,
|
||||
RELAY_PAYLOAD,
|
||||
CLIENT_MESSAGE_BINARY,
|
||||
CLIENT_MESSAGE,
|
||||
ExcalidrawElement,
|
||||
} from "./protocol";
|
||||
import { StoreDelta } from "../store";
|
||||
|
||||
/**
|
||||
* Core excalidraw sync logic.
|
||||
*/
|
||||
export class ExcalidrawSyncServer {
|
||||
private readonly lock: AsyncLock = new AsyncLock();
|
||||
private readonly sessions: Set<WebSocket> = new Set();
|
||||
private readonly chunks = new Map<
|
||||
CHUNK_INFO["id"],
|
||||
Map<CHUNK_INFO["position"], CLIENT_MESSAGE_BINARY["payload"]>
|
||||
>();
|
||||
|
||||
// CFDO II: load from the db
|
||||
private elements = new Map<string, ExcalidrawElement>();
|
||||
|
||||
constructor(private readonly repository: DeltasRepository) {
|
||||
// CFDO II: load from the db
|
||||
const deltas = this.repository.getAllSinceVersion(0);
|
||||
|
||||
for (const delta of deltas) {
|
||||
const storeDelta = StoreDelta.load(delta.payload);
|
||||
|
||||
// CFDO II: fix types (everywhere)
|
||||
const [nextElements] = storeDelta.elements.applyTo(this.elements as any);
|
||||
|
||||
this.elements = nextElements;
|
||||
}
|
||||
}
|
||||
|
||||
// CFDO: optimize, should send a message about collaborators (no collaborators => no need to send ephemerals)
|
||||
public onConnect(client: WebSocket) {
|
||||
this.sessions.add(client);
|
||||
}
|
||||
|
||||
public onDisconnect(client: WebSocket) {
|
||||
this.sessions.delete(client);
|
||||
}
|
||||
|
||||
public onMessage(
|
||||
client: WebSocket,
|
||||
message: ArrayBuffer,
|
||||
): Promise<void> | void {
|
||||
const [rawMessage, parsingError] = Utils.try<CLIENT_MESSAGE_BINARY>(() =>
|
||||
Network.decodeClientMessage(message),
|
||||
);
|
||||
|
||||
if (parsingError) {
|
||||
console.error(parsingError);
|
||||
return;
|
||||
}
|
||||
|
||||
// if there is chunkInfo, there are more than 1 chunks => process them first
|
||||
if (rawMessage.chunkInfo) {
|
||||
return this.processChunks(client, {
|
||||
...rawMessage,
|
||||
chunkInfo: rawMessage.chunkInfo,
|
||||
});
|
||||
}
|
||||
|
||||
return this.processMessage(client, rawMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* Process chunks in case the client-side payload would overflow the 1MiB durable object WS message limit.
|
||||
*/
|
||||
private processChunks(
|
||||
client: WebSocket,
|
||||
message: CLIENT_MESSAGE_BINARY &
|
||||
Required<Pick<CLIENT_MESSAGE_BINARY, "chunkInfo">>,
|
||||
) {
|
||||
let shouldCleanupchunks = true;
|
||||
const {
|
||||
type,
|
||||
payload,
|
||||
chunkInfo: { id, position, count },
|
||||
} = message;
|
||||
|
||||
try {
|
||||
if (!this.chunks.has(id)) {
|
||||
this.chunks.set(id, new Map());
|
||||
}
|
||||
|
||||
const chunks = this.chunks.get(id);
|
||||
|
||||
if (!chunks) {
|
||||
// defensive, shouldn't really happen
|
||||
throw new Error(`Coudn't find a relevant chunk with id "${id}"!`);
|
||||
}
|
||||
|
||||
// set the buffer by order
|
||||
chunks.set(position, payload);
|
||||
|
||||
if (chunks.size !== count) {
|
||||
// we don't have all the chunks, don't cleanup just yet!
|
||||
shouldCleanupchunks = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// hopefully we can fit into the 128 MiB memory limit
|
||||
const restoredPayload = Array.from(chunks)
|
||||
.sort(([positionA], [positionB]) => (positionA <= positionB ? -1 : 1))
|
||||
.reduce(
|
||||
(acc, [_, payload]) => Uint8Array.from([...acc, ...payload]),
|
||||
new Uint8Array(),
|
||||
);
|
||||
|
||||
const rawMessage = {
|
||||
type,
|
||||
payload: restoredPayload,
|
||||
} as Omit<CLIENT_MESSAGE_BINARY, "chunkInfo">;
|
||||
|
||||
return this.processMessage(client, rawMessage);
|
||||
} catch (error) {
|
||||
console.error(`Error while processing chunk "${id}"`, error);
|
||||
} finally {
|
||||
// cleanup the chunks
|
||||
if (shouldCleanupchunks) {
|
||||
this.chunks.delete(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private processMessage(
|
||||
client: WebSocket,
|
||||
{ type, payload }: Omit<CLIENT_MESSAGE_BINARY, "chunkInfo">,
|
||||
) {
|
||||
const [parsedPayload, parsingError] = Utils.try<CLIENT_MESSAGE["payload"]>(
|
||||
() => Network.fromBinary(payload),
|
||||
);
|
||||
|
||||
if (parsingError) {
|
||||
console.error(parsingError);
|
||||
return;
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case "restore":
|
||||
return this.restore(client);
|
||||
case "relay":
|
||||
return this.relay(client, parsedPayload as RELAY_PAYLOAD);
|
||||
case "pull":
|
||||
return this.pull(client, parsedPayload as PULL_PAYLOAD);
|
||||
case "push":
|
||||
// apply each one-by-one to avoid race conditions
|
||||
// CFDO: in theory we do not need to block ephemeral appState (for now we are not even using them)
|
||||
return this.lock.acquire("push", () =>
|
||||
this.push(client, parsedPayload as PUSH_PAYLOAD),
|
||||
);
|
||||
default:
|
||||
console.error(`Unknown message type: ${type}`);
|
||||
}
|
||||
}
|
||||
|
||||
private restore(client: WebSocket) {
|
||||
return this.send(client, {
|
||||
type: "restored",
|
||||
payload: {
|
||||
elements: Array.from(this.elements.values()),
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private relay(client: WebSocket, payload: RELAY_PAYLOAD) {
|
||||
// CFDO I: we should likely apply these to the snapshot
|
||||
return this.broadcast(
|
||||
{
|
||||
type: "relayed",
|
||||
payload,
|
||||
},
|
||||
client,
|
||||
);
|
||||
}
|
||||
|
||||
private pull(client: WebSocket, payload: PULL_PAYLOAD) {
|
||||
// CFDO: test for invalid payload
|
||||
const lastAcknowledgedClientVersion = payload.lastAcknowledgedVersion;
|
||||
const lastAcknowledgedServerVersion = this.repository.getLastVersion();
|
||||
|
||||
const versionΔ =
|
||||
lastAcknowledgedServerVersion - lastAcknowledgedClientVersion;
|
||||
|
||||
if (versionΔ < 0) {
|
||||
// CFDO II: restore the client from the snapshot / deltas?
|
||||
console.error(
|
||||
`Panic! Client claims to have higher acknowledged version than the latest one on the server!`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const deltas: SERVER_DELTA[] = [];
|
||||
|
||||
if (versionΔ > 0) {
|
||||
deltas.push(
|
||||
...this.repository.getAllSinceVersion(lastAcknowledgedClientVersion),
|
||||
);
|
||||
}
|
||||
|
||||
this.send(client, {
|
||||
type: "acknowledged",
|
||||
payload: {
|
||||
deltas,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private push(client: WebSocket, delta: PUSH_PAYLOAD) {
|
||||
const [storeDelta, applyingError] = Utils.try(() => {
|
||||
// update the "deleted" delta according to the latest changes (in case of concurrent changes)
|
||||
const storeDelta = StoreDelta.applyLatestChanges(
|
||||
StoreDelta.load(delta),
|
||||
this.elements as any,
|
||||
"deleted",
|
||||
);
|
||||
|
||||
// apply the delta to the elements snapshot
|
||||
const [nextElements] = storeDelta.elements.applyTo(this.elements as any);
|
||||
|
||||
this.elements = nextElements;
|
||||
|
||||
return storeDelta;
|
||||
});
|
||||
|
||||
if (applyingError) {
|
||||
// CFDO: everything should be automatically rolled-back in the db -> double-check
|
||||
return this.send(client, {
|
||||
type: "rejected",
|
||||
payload: {
|
||||
message: applyingError
|
||||
? applyingError.message
|
||||
: "Couldn't apply the delta.",
|
||||
deltas: [delta],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
const [acknowledged, savingError] = Utils.try(() => {
|
||||
return this.repository.save(storeDelta);
|
||||
});
|
||||
|
||||
if (savingError || !acknowledged) {
|
||||
// CFDO: everything should be automatically rolled-back in the db -> double-check
|
||||
return this.send(client, {
|
||||
type: "rejected",
|
||||
payload: {
|
||||
message: savingError
|
||||
? savingError.message
|
||||
: "Coudn't persist the delta.",
|
||||
deltas: [storeDelta],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
return this.broadcast({
|
||||
type: "acknowledged",
|
||||
payload: {
|
||||
deltas: [acknowledged],
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
private send(ws: WebSocket, message: SERVER_MESSAGE) {
|
||||
const [encodedMessage, encodingError] = Utils.try(() =>
|
||||
Network.toBinary(message),
|
||||
);
|
||||
|
||||
if (encodingError) {
|
||||
console.error(encodingError);
|
||||
return;
|
||||
}
|
||||
|
||||
ws.send(encodedMessage);
|
||||
}
|
||||
|
||||
private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) {
|
||||
const [encodedMessage, encodingError] = Utils.try(() =>
|
||||
Network.toBinary(message),
|
||||
);
|
||||
|
||||
if (encodingError) {
|
||||
console.error(encodingError);
|
||||
return;
|
||||
}
|
||||
|
||||
for (const ws of this.sessions) {
|
||||
if (ws === exclude) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ws.send(encodedMessage);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,68 +0,0 @@
|
|||
import msgpack from "msgpack-lite";
|
||||
|
||||
import type { CLIENT_MESSAGE_BINARY } from "./protocol";
|
||||
|
||||
export const Utils = {
|
||||
try<T>(cb: () => T): [T, null] | [null, Error] {
|
||||
try {
|
||||
const result = cb();
|
||||
return [result, null];
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
return [null, error];
|
||||
}
|
||||
|
||||
if (typeof error === "string") {
|
||||
return [null, new Error(error)];
|
||||
}
|
||||
|
||||
return [null, new Error("Unknown error")];
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
export const Network = {
|
||||
toBinary: (payload: Record<string, unknown>) => {
|
||||
return new Uint8Array(msgpack.encode(payload));
|
||||
},
|
||||
fromBinary: (payload: Uint8Array) => {
|
||||
return msgpack.decode(payload);
|
||||
},
|
||||
encodeClientMessage: (message: CLIENT_MESSAGE_BINARY) => {
|
||||
const { payload, ...metadata } = message;
|
||||
const metadataBuffer = Network.toBinary(metadata);
|
||||
|
||||
// contains the length of the rest of the message, so that we could chunk the payload and decode it server side
|
||||
const headerBuffer = new ArrayBuffer(4);
|
||||
new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength);
|
||||
|
||||
// concatenate into [header(4 bytes)][metadata][payload]
|
||||
return Uint8Array.from([
|
||||
...new Uint8Array(headerBuffer),
|
||||
...metadataBuffer,
|
||||
...message.payload,
|
||||
]);
|
||||
},
|
||||
decodeClientMessage: (message: ArrayBuffer) => {
|
||||
const headerLength = 4;
|
||||
const header = new Uint8Array(message, 0, headerLength);
|
||||
const metadataLength = new DataView(
|
||||
header.buffer,
|
||||
header.byteOffset,
|
||||
).getUint32(0);
|
||||
|
||||
const metadata = new Uint8Array(
|
||||
message,
|
||||
headerLength,
|
||||
headerLength + metadataLength,
|
||||
);
|
||||
|
||||
const payload = new Uint8Array(message, headerLength + metadataLength);
|
||||
const rawMessage = {
|
||||
...Network.fromBinary(metadata),
|
||||
payload,
|
||||
} as CLIENT_MESSAGE_BINARY;
|
||||
|
||||
return rawMessage;
|
||||
},
|
||||
};
|
|
@ -1,7 +0,0 @@
|
|||
// Generated by Wrangler by running `wrangler types --experimental-include-runtime=./worker-runtime.d.ts`
|
||||
|
||||
interface Env {
|
||||
DURABLE_ROOM: DurableObjectNamespace<
|
||||
import("./cloudflare/worker").DurableRoom
|
||||
>;
|
||||
}
|
4691
packages/excalidraw/worker-runtime.d.ts
vendored
4691
packages/excalidraw/worker-runtime.d.ts
vendored
File diff suppressed because it is too large
Load diff
|
@ -1,23 +0,0 @@
|
|||
#:schema node_modules/wrangler/config-schema.json
|
||||
name = "excalidraw-sync"
|
||||
main = "cloudflare/worker.ts"
|
||||
compatibility_date = "2024-11-12"
|
||||
|
||||
# Workers Logs
|
||||
# Docs: https://developers.cloudflare.com/workers/observability/logs/workers-logs/
|
||||
# Configuration: https://developers.cloudflare.com/workers/observability/logs/workers-logs/#enable-workers-logs
|
||||
[observability]
|
||||
enabled = true
|
||||
|
||||
# Bind a Durable Object. Durable objects are a scale-to-zero compute primitive based on the actor model.
|
||||
# Durable Objects can live for as long as needed. Use these when you need a long-running "server", such as in realtime apps.
|
||||
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#durable-objects
|
||||
[[durable_objects.bindings]]
|
||||
name = "DURABLE_ROOM"
|
||||
class_name = "DurableRoom"
|
||||
|
||||
# Durable Object migrations.
|
||||
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#migrations
|
||||
[[migrations]]
|
||||
tag = "v1"
|
||||
new_sqlite_classes = ["DurableRoom"]
|
Loading…
Add table
Add a link
Reference in a new issue