Temporarily move sync into package

This commit is contained in:
Marcel Mraz 2024-11-22 16:24:15 +01:00
parent 245d681b7d
commit 508cfbc843
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
13 changed files with 1110 additions and 34 deletions

View file

@ -0,0 +1,64 @@
import type {
ChangesRepository,
CLIENT_CHANGE,
SERVER_CHANGE,
} from "../sync/protocol";
// TODO: add senderId, possibly roomId as well
export class DurableChangesRepository implements ChangesRepository {
constructor(private storage: DurableObjectStorage) {
// #region DEV ONLY
this.storage.sql.exec(`DROP TABLE IF EXISTS changes;`);
// #endregion
this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS changes(
id TEXT PRIMARY KEY,
payload TEXT NOT NULL,
version INTEGER NOT NULL DEFAULT 1,
createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
);`);
}
public saveAll = (changes: Array<CLIENT_CHANGE>) =>
this.storage.transactionSync(() => {
const prevVersion = this.getLastVersion();
const nextVersion = prevVersion + changes.length;
// TODO: in theory payload could contain array of changes, if we would need to optimize writes
for (const [index, change] of changes.entries()) {
const version = prevVersion + index + 1;
// unique id ensures that we don't acknowledge the same change twice
this.storage.sql.exec(
`INSERT INTO changes (id, payload, version) VALUES (?, ?, ?);`,
change.id,
JSON.stringify(change),
version,
);
}
// sanity check
if (nextVersion !== this.getLastVersion()) {
throw new Error(
`Expected last acknowledged version to be "${nextVersion}", but it is "${this.getLastVersion()}!"`,
);
}
return this.getSinceVersion(prevVersion);
});
public getSinceVersion = (version: number): Array<SERVER_CHANGE> =>
this.storage.sql
.exec<SERVER_CHANGE>(
`SELECT id, payload, version FROM changes WHERE version > (?) ORDER BY version ASC;`,
version,
)
.toArray();
public getLastVersion = (): number => {
const result = this.storage.sql
.exec(`SELECT MAX(version) FROM changes;`)
.one();
return result ? Number(result["MAX(version)"]) : 0;
};
}

View file

@ -0,0 +1,79 @@
import { DurableObject } from "cloudflare:workers";
import { DurableChangesRepository } from "./changes";
import { ExcalidrawSyncServer } from "../sync/server";
import type { ExcalidrawElement } from "../element/types";
/**
* Durable Object impl. of Excalidraw room.
*/
export class DurableRoom extends DurableObject {
private roomId: string | null = null;
private sync: ExcalidrawSyncServer;
private snapshot!: {
appState: Record<string, any>;
elements: Map<string, ExcalidrawElement>;
version: number;
};
constructor(ctx: DurableObjectState, env: Env) {
super(ctx, env);
this.ctx.blockConcurrencyWhile(async () => {
// TODO: snapshot should likely be a transient store
// TODO: loaded the latest state from the db
this.snapshot = {
// TODO: start persisting acknowledged version (not a scene version!)
// TODO: we don't persist appState, should we?
appState: {},
elements: new Map(),
version: 0,
};
this.roomId = (await this.ctx.storage.get("roomId")) || null;
});
this.sync = new ExcalidrawSyncServer(
new DurableChangesRepository(ctx.storage),
);
// in case it hibernates, let's get take active connections
for (const ws of this.ctx.getWebSockets()) {
this.sync.onConnect(ws);
}
}
public fetch = async (request: Request): Promise<Response> =>
this.connect(request);
public webSocketMessage = (client: WebSocket, message: string) =>
this.sync.onMessage(client, message);
public webSocketClose = (ws: WebSocket) => this.sync.onDisconnect(ws);
private connect(request: Request) {
if (!this.roomId) {
const roomId = new URL(request.url).searchParams.get("roomId");
if (!roomId) {
return new Response(null, { status: 400 /* bad request */ });
}
this.ctx.blockConcurrencyWhile(async () => {
await this.ctx.storage.put("roomId", roomId);
this.roomId = roomId;
});
}
const { 0: client, 1: server } = new WebSocketPair();
this.ctx.acceptWebSocket(client);
this.sync.onConnect(client);
return new Response(null, {
status: 101 /* switching protocols */,
webSocket: server,
});
}
}

View file

@ -0,0 +1,39 @@
export { DurableRoom } from "./room";
/**
* Worker relay for Durable Room.
*/
export default {
// TODO: ensure it's wss in the prod
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext,
): Promise<Response> {
// TODO: only auth user should reach this
const upgrade = request.headers.get("upgrade");
if (!upgrade || upgrade !== "websocket") {
return new Response(null, { status: 426 /* upgrade required */ });
}
if (request.method !== "GET") {
return new Response(null, { status: 405 /* method not allowed */ });
}
const url = new URL(request.url);
if (url.pathname !== "/connect") {
return new Response(null, { status: 403 /* forbidden */ });
}
// TODO: double check that the scene exists
const roomId = url.searchParams.get("roomId");
if (!roomId) {
return new Response(null, { status: 400 /* bad request */ });
}
const id: DurableObjectId = env.DURABLE_ROOM.idFromName(roomId);
const room = env.DURABLE_ROOM.get(id);
return room.fetch(request);
},
};

View file

@ -62,6 +62,8 @@
"@excalidraw/random-username": "1.1.0",
"@radix-ui/react-popover": "1.0.3",
"@radix-ui/react-tabs": "1.0.2",
"@types/async-lock": "1.4.2",
"async-lock": "^1.4.1",
"browser-fs-access": "0.29.1",
"canvas-roundrect-polyfill": "0.0.1",
"clsx": "1.1.1",
@ -96,10 +98,12 @@
"@babel/preset-env": "7.24.5",
"@babel/preset-react": "7.24.1",
"@babel/preset-typescript": "7.24.1",
"@cloudflare/workers-types": "^4.20241112.0",
"@size-limit/preset-big-lib": "9.0.0",
"@testing-library/dom": "10.4.0",
"@testing-library/jest-dom": "5.16.2",
"@testing-library/react": "16.0.0",
"@types/async-lock": "^1.4.2",
"@types/pako": "1.0.3",
"@types/pica": "5.1.3",
"@types/resize-observer-browser": "0.1.7",
@ -124,7 +128,8 @@
"size-limit": "9.0.0",
"style-loader": "3.3.3",
"ts-loader": "9.3.1",
"typescript": "4.9.4"
"typescript": "4.9.4",
"wrangler": "^3.60.3"
},
"bugs": "https://github.com/excalidraw/excalidraw/issues",
"homepage": "https://github.com/excalidraw/excalidraw/tree/master/packages/excalidraw",
@ -134,6 +139,9 @@
"pack": "yarn build:umd && yarn pack",
"start": "node ../../scripts/buildExample.mjs && vite",
"build:example": "node ../../scripts/buildExample.mjs",
"size": "yarn build:umd && size-limit"
"size": "yarn build:umd && size-limit",
"cf:deploy": "wrangler deploy",
"cf:dev": "wrangler dev",
"cf:typegen": "wrangler types"
}
}

View file

@ -0,0 +1,143 @@
import { Utils } from "./utils";
import type { CLIENT_CHANGE, SERVER_CHANGE } from "./protocol";
class ExcalidrawSyncClient {
// TODO: add prod url
private static readonly HOST_URL = "ws://localhost:8787";
private roomId: string;
private lastAcknowledgedVersion: number;
private server: WebSocket | null = null;
constructor(roomId: string = "test_room_1") {
this.roomId = roomId;
// TODO: persist in idb
this.lastAcknowledgedVersion = 0;
}
public connect() {
this.server = new WebSocket(
`${ExcalidrawSyncClient.HOST_URL}/connect?roomId=${this.roomId}`,
);
this.server.addEventListener("open", this.onOpen);
this.server.addEventListener("message", this.onMessage);
this.server.addEventListener("close", this.onClose);
this.server.addEventListener("error", this.onError);
}
public disconnect() {
if (this.server) {
this.server.removeEventListener("open", this.onOpen);
this.server.removeEventListener("message", this.onMessage);
this.server.removeEventListener("close", this.onClose);
this.server.removeEventListener("error", this.onError);
this.server.close();
}
}
private onOpen = () => this.sync();
// TODO: could be an array buffer
private onMessage = (event: MessageEvent) => {
const [result, error] = Utils.try(() => JSON.parse(event.data as string));
if (error) {
console.error("Failed to parse message:", event.data);
return;
}
const { type, payload } = result;
switch (type) {
case "relayed":
return this.handleRelayed(payload);
case "acknowledged":
return this.handleAcknowledged(payload);
case "rejected":
return this.handleRejected(payload);
default:
console.error("Unknown message type:", type);
}
};
private onClose = () => this.disconnect();
private onError = (error: Event) => console.error("WebSocket error:", error);
public sync() {
const remoteChanges = this.send({
type: "pull",
payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion },
});
// TODO: apply remote changes
// const localChanges: Array<CLIENT_CHANGE> = [];
// // TODO: apply local changes (unacknowledged)
// this.push(localChanges, 'durable');
}
public pull() {
return this.send({
type: "pull",
payload: { lastAcknowledgedVersion: this.lastAcknowledgedVersion },
});
}
public push(changes: Array<CLIENT_CHANGE>, type: "durable" | "ephemeral") {
return this.send({
type: "push",
payload: { type, changes },
});
}
public relay(buffer: ArrayBuffer) {
return this.send({
type: "relay",
payload: { buffer },
});
}
private handleMessage(message: string) {
const [result, error] = Utils.try(() => JSON.parse(message));
if (error) {
console.error("Failed to parse message:", message);
return;
}
const { type, payload } = result;
switch (type) {
case "relayed":
return this.handleRelayed(payload);
case "acknowledged":
return this.handleAcknowledged(payload);
case "rejected":
return this.handleRejected(payload);
default:
console.error("Unknown message type:", type);
}
}
private handleRelayed(payload: { changes: Array<CLIENT_CHANGE> }) {
console.log("Relayed message received:", payload);
// Process relayed changes
}
private handleAcknowledged(payload: { changes: Array<SERVER_CHANGE> }) {
console.log("Acknowledged message received:", payload);
// Handle acknowledged changes
}
private handleRejected(payload: { ids: Array<string>; message: string }) {
console.error("Rejected message received:", payload);
// Handle rejected changes
}
private send(message: { type: string; payload: any }) {
if (this.server && this.server.readyState === WebSocket.OPEN) {
this.server.send(JSON.stringify(message));
} else {
console.error("WebSocket is not open. Unable to send message.");
}
}
}

View file

@ -0,0 +1,40 @@
export type RELAY_PAYLOAD = { buffer: ArrayBuffer };
export type PULL_PAYLOAD = { lastAcknowledgedVersion: number };
export type PUSH_PAYLOAD = {
type: "durable" | "ephemeral";
changes: Array<CLIENT_CHANGE>;
};
export type CLIENT_CHANGE = {
id: string;
appStateChange: any;
elementsChange: any;
};
export type CLIENT_MESSAGE =
| { type: "relay"; payload: RELAY_PAYLOAD }
| { type: "pull"; payload: PULL_PAYLOAD }
| { type: "push"; payload: PUSH_PAYLOAD };
export type SERVER_CHANGE = { id: string; version: number; payload: string };
export type SERVER_MESSAGE =
| {
type: "relayed";
payload: { changes: Array<CLIENT_CHANGE> } | RELAY_PAYLOAD;
}
| { type: "acknowledged"; payload: { changes: Array<SERVER_CHANGE> } }
| { type: "rejected"; payload: { ids: Array<string>; message: string } };
export interface ChangesRepository {
saveAll(changes: Array<CLIENT_CHANGE>): Array<SERVER_CHANGE>;
getSinceVersion(version: number): Array<SERVER_CHANGE>;
getLastVersion(): number;
}
// TODO: should come from the shared types package
export type ExcalidrawElement = {
id: string;
type: any;
version: number;
[key: string]: any;
};

View file

@ -0,0 +1,155 @@
import AsyncLock from "async-lock";
import { Utils } from "./utils";
import type {
ChangesRepository,
CLIENT_CHANGE,
CLIENT_MESSAGE,
PULL_PAYLOAD,
PUSH_PAYLOAD,
RELAY_PAYLOAD,
SERVER_MESSAGE,
} from "./protocol";
// TODO: message could be binary (cbor, protobuf, etc.)
/**
* Core excalidraw sync logic.
*/
export class ExcalidrawSyncServer {
private readonly lock: AsyncLock = new AsyncLock();
private readonly sessions: Set<WebSocket> = new Set();
constructor(private readonly changesRepository: ChangesRepository) {}
public onConnect(client: WebSocket) {
this.sessions.add(client);
}
public onDisconnect(client: WebSocket) {
this.sessions.delete(client);
}
public onMessage(client: WebSocket, message: string) {
const [result, error] = Utils.try<CLIENT_MESSAGE>(() =>
JSON.parse(message),
);
if (error) {
console.error(error);
return;
}
const { type, payload } = result;
switch (type) {
case "relay":
return this.relay(client, payload);
case "pull":
return this.pull(client, payload);
case "push":
// apply each one-by-one to avoid race conditions
// TODO: in theory we do not need to block ephemeral appState changes
return this.lock.acquire("push", () => this.push(client, payload));
default:
console.error(`Unknown message type: ${type}`);
}
}
private pull(client: WebSocket, payload: PULL_PAYLOAD) {
// TODO: test for invalid payload
const lastAcknowledgedClientVersion = payload.lastAcknowledgedVersion;
const lastAcknowledgedServerVersion =
this.changesRepository.getLastVersion();
const versionΔ =
lastAcknowledgedServerVersion - lastAcknowledgedClientVersion;
if (versionΔ === 0) {
console.info(`Client is up to date!`);
return;
}
if (versionΔ < 0) {
// TODO: restore the client from the snapshot / deltas?
console.error(
`Panic! Client claims to have higher acknowledged version than the latest one on the server!`,
);
return;
}
if (versionΔ > 0) {
const changes = this.changesRepository.getSinceVersion(
lastAcknowledgedClientVersion,
);
this.send(client, {
type: "acknowledged",
payload: {
changes,
},
});
}
}
private push(client: WebSocket, payload: PUSH_PAYLOAD) {
const { type, changes } = payload;
switch (type) {
case "ephemeral":
return this.relay(client, { changes });
case "durable":
const [acknowledged, error] = Utils.try(() => {
// TODO: try to apply the changes to the snapshot
return this.changesRepository.saveAll(changes);
});
if (error) {
return this.send(client, {
type: "rejected",
payload: {
ids: changes.map((i) => i.id),
message: error.message,
},
});
}
return this.broadcast({
type: "acknowledged",
payload: {
changes: acknowledged,
},
});
default:
console.error(`Unknown message type: ${type}`);
}
}
private relay(
client: WebSocket,
payload: { changes: Array<CLIENT_CHANGE> } | RELAY_PAYLOAD,
) {
return this.broadcast(
{
type: "relayed",
payload,
},
client,
);
}
private send(client: WebSocket, message: SERVER_MESSAGE) {
const msg = JSON.stringify(message);
client.send(msg);
}
private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) {
const msg = JSON.stringify(message);
for (const ws of this.sessions) {
if (ws === exclude) {
continue;
}
ws.send(msg);
}
}
}

View file

@ -0,0 +1,18 @@
export const Utils = {
try<T>(cb: () => T): [T, null] | [null, Error] {
try {
const result = cb();
return [result, null];
} catch (error) {
if (error instanceof Error) {
return [null, error];
}
if (typeof error === "string") {
return [null, new Error(error)];
}
return [null, new Error("Unknown error")];
}
},
};

View file

@ -10,6 +10,9 @@
"module": "ESNext",
"moduleResolution": "Node",
"resolveJsonModule": true,
"jsx": "react-jsx"
"jsx": "react-jsx",
"types": [
"@cloudflare/workers-types/2023-07-01"
],
}
}

View file

@ -0,0 +1,7 @@
// Generated by Wrangler by running `wrangler types`
interface Env {
DURABLE_ROOM: DurableObjectNamespace<
import("./cloudflare/worker").DurableRoom
>;
}

View file

@ -0,0 +1,23 @@
#:schema node_modules/wrangler/config-schema.json
name = "excalidraw-sync"
main = "cloudflare/worker.ts"
compatibility_date = "2024-11-12"
# Workers Logs
# Docs: https://developers.cloudflare.com/workers/observability/logs/workers-logs/
# Configuration: https://developers.cloudflare.com/workers/observability/logs/workers-logs/#enable-workers-logs
[observability]
enabled = true
# Bind a Durable Object. Durable objects are a scale-to-zero compute primitive based on the actor model.
# Durable Objects can live for as long as needed. Use these when you need a long-running "server", such as in realtime apps.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#durable-objects
[[durable_objects.bindings]]
name = "DURABLE_ROOM"
class_name = "DurableRoom"
# Durable Object migrations.
# Docs: https://developers.cloudflare.com/workers/wrangler/configuration/#migrations
[[migrations]]
tag = "v1"
new_sqlite_classes = ["DurableRoom"]