Ditching strings and exchanging buffers

This commit is contained in:
Marcel Mraz 2025-01-23 23:04:29 +01:00
parent cdd7f6158b
commit 05ba0339fe
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
8 changed files with 258 additions and 74 deletions

View file

@ -31,6 +31,7 @@ export class DurableDeltasRepository implements DeltasRepository {
}
try {
// CFDO: could be also a buffer
const payload = JSON.stringify(delta);
const payloadSize = new TextEncoder().encode(payload).byteLength;
const nextVersion = this.getLastVersion() + 1;
@ -113,6 +114,7 @@ export class DurableDeltasRepository implements DeltasRepository {
return restoredDeltas[0];
}
// CFDO: fix types (should be buffer in the first place)
private restoreServerDeltas(deltas: SERVER_DELTA[]): SERVER_DELTA[] {
return Array.from(
deltas
@ -137,6 +139,10 @@ export class DurableDeltasRepository implements DeltasRepository {
return acc;
}, new Map<number, SERVER_DELTA>())
.values(),
);
// CFDO: temporary
).map((delta) => ({
...delta,
payload: JSON.parse(delta.payload),
}));
}
}

View file

@ -46,7 +46,7 @@ export class DurableRoom extends DurableObject {
public fetch = async (request: Request): Promise<Response> =>
this.connect(request);
public webSocketMessage = (client: WebSocket, message: string) =>
public webSocketMessage = (client: WebSocket, message: ArrayBuffer) =>
this.sync.onMessage(client, message);
public webSocketClose = (ws: WebSocket) => this.sync.onDisconnect(ws);

View file

@ -76,6 +76,7 @@
"jotai-scope": "0.7.2",
"lodash.debounce": "4.0.8",
"lodash.throttle": "4.1.1",
"msgpack-lite": "0.1.26",
"nanoid": "3.3.3",
"open-color": "1.9.1",
"pako": "1.0.11",
@ -106,6 +107,7 @@
"@testing-library/jest-dom": "5.16.2",
"@testing-library/react": "16.0.0",
"@types/async-lock": "^1.4.2",
"@types/msgpack-lite": "0.1.11",
"@types/pako": "1.0.3",
"@types/pica": "5.1.3",
"@types/resize-observer-browser": "0.1.7",

View file

@ -12,6 +12,7 @@ import type {
OrderedExcalidrawElement,
SceneElementsMap,
} from "./element/types";
import type { SERVER_DELTA } from "./sync/protocol";
import { arrayToMap, assertNever } from "./utils";
import { hashElementsVersion } from "./element";
import { syncMovedIndices } from "./fractionalIndex";
@ -449,14 +450,11 @@ export class StoreDelta {
/**
* Parse and load the delta from the remote payload.
*/
// CFDO: why it would be a string if it can be a DTO?
public static load(payload: string) {
public static load({
id,
elements: { added, removed, updated },
}: SERVER_DELTA["payload"]) {
// CFDO: ensure typesafety
const {
id,
elements: { added, removed, updated },
} = JSON.parse(payload);
const elements = ElementsDelta.create(added, removed, updated, {
shouldRedistribute: false,
});

View file

@ -1,5 +1,6 @@
/* eslint-disable no-console */
import throttle from "lodash.throttle";
import msgpack from "msgpack-lite";
import ReconnectingWebSocket, {
type Event,
type CloseEvent,
@ -14,7 +15,12 @@ import { StoreAction, StoreDelta } from "../store";
import type { StoreChange } from "../store";
import type { ExcalidrawImperativeAPI } from "../types";
import type { ExcalidrawElement, SceneElementsMap } from "../element/types";
import type { CLIENT_MESSAGE_RAW, SERVER_DELTA, CHANGE } from "./protocol";
import type {
CLIENT_MESSAGE_RAW,
SERVER_DELTA,
CHANGE,
SERVER_MESSAGE,
} from "./protocol";
import { debounce } from "../utils";
import { randomId } from "../random";
import { orderByFractionalIndex } from "../fractionalIndex";
@ -22,7 +28,7 @@ import { orderByFractionalIndex } from "../fractionalIndex";
class SocketMessage implements CLIENT_MESSAGE_RAW {
constructor(
public readonly type: "relay" | "pull" | "push",
public readonly payload: string,
public readonly payload: Uint8Array,
public readonly chunkInfo?: {
id: string;
position: number;
@ -49,7 +55,7 @@ class SocketClient {
private readonly handlers: {
onOpen: (event: Event) => void;
onOnline: () => void;
onMessage: (event: MessageEvent) => void;
onMessage: (message: SERVER_MESSAGE) => void;
},
) {}
@ -138,12 +144,12 @@ class SocketClient {
const { type, payload } = message;
// CFDO II: could be slowish for large payloads, thing about a better solution (i.e. msgpack 10x faster, 2x smaller)
const stringifiedPayload = JSON.stringify(payload);
const payloadSize = new TextEncoder().encode(stringifiedPayload).byteLength;
const payloadBuffer = msgpack.encode(payload) as Uint8Array;
const payloadSize = payloadBuffer.byteLength;
if (payloadSize < SocketClient.MAX_MESSAGE_SIZE) {
const message = new SocketMessage(type, stringifiedPayload);
return this.socket?.send(JSON.stringify(message));
const message = new SocketMessage(type, payloadBuffer);
return this.sendMessage(message);
}
const chunkId = randomId();
@ -153,19 +159,25 @@ class SocketClient {
for (let position = 0; position < chunksCount; position++) {
const start = position * chunkSize;
const end = start + chunkSize;
const chunkedPayload = stringifiedPayload.slice(start, end);
const chunkedPayload = payloadBuffer.subarray(start, end);
const message = new SocketMessage(type, chunkedPayload, {
id: chunkId,
position,
count: chunksCount,
});
this.socket?.send(JSON.stringify(message));
this.sendMessage(message);
}
}
private onMessage = (event: MessageEvent) => {
this.handlers.onMessage(event);
this.receiveMessage(event.data).then((message) => {
if (!message) {
return;
}
this.handlers.onMessage(message);
});
};
private onOpen = (event: Event) => {
@ -184,6 +196,68 @@ class SocketClient {
event,
);
};
private sendMessage = ({ payload, ...metadata }: CLIENT_MESSAGE_RAW) => {
const metadataBuffer = msgpack.encode(metadata) as Uint8Array;
// contains the length of the rest of the message, so that we could decode it server side
const headerBuffer = new ArrayBuffer(4);
new DataView(headerBuffer).setUint32(0, metadataBuffer.byteLength);
// concatenate into [header(4 bytes)][metadata][payload]
const message = Uint8Array.from([
...new Uint8Array(headerBuffer),
...metadataBuffer,
...payload,
]);
// CFDO: add dev-level logging
{
const headerLength = 4;
const header = new Uint8Array(message.buffer, 0, headerLength);
const metadataLength = new DataView(
header.buffer,
header.byteOffset,
).getUint32(0);
const metadata = new Uint8Array(
message.buffer,
headerLength,
headerLength + metadataLength,
);
const payload = new Uint8Array(
message.buffer,
headerLength + metadataLength,
);
console.log({
...msgpack.decode(metadata),
payload,
});
}
this.socket?.send(message);
};
private async receiveMessage(
message: Blob,
): Promise<SERVER_MESSAGE | undefined> {
const arrayBuffer = await message.arrayBuffer();
const uint8Array = new Uint8Array(arrayBuffer);
const [decodedMessage, decodeError] = Utils.try<SERVER_MESSAGE>(() =>
msgpack.decode(uint8Array),
);
if (decodeError) {
console.error("Failed to decode message:", message);
return;
}
// CFDO: should be type-safe
return decodedMessage;
}
}
interface AcknowledgedDelta {
@ -351,23 +425,17 @@ export class SyncClient {
this.push();
};
private onMessage = (event: MessageEvent) => {
// CFDO: could be an array buffer
const [result, error] = Utils.try(() => JSON.parse(event.data as string));
private onMessage = ({ type, payload }: SERVER_MESSAGE) => {
// CFDO: add dev-level logging
console.log({ type, payload });
if (error) {
console.error("Failed to parse message:", event.data);
return;
}
const { type, payload } = result;
switch (type) {
case "relayed":
return this.handleRelayed(payload);
case "acknowledged":
return this.handleAcknowledged(payload);
case "rejected":
return this.handleRejected(payload);
// case "rejected":
// return this.handleRejected(payload);
default:
console.error("Unknown message type:", type);
}
@ -499,7 +567,7 @@ export class SyncClient {
private handleRejected = (payload: {
ids: Array<string>;
message: string;
message: Uint8Array;
}) => {
// handle rejected deltas
console.error("Rejected message received:", payload);

View file

@ -16,7 +16,7 @@ export type CHUNK_INFO = {
export type CLIENT_MESSAGE_RAW = {
type: "relay" | "pull" | "push";
payload: string;
payload: Uint8Array;
chunkInfo?: CHUNK_INFO;
};
@ -26,7 +26,12 @@ export type CLIENT_MESSAGE = { chunkInfo: CHUNK_INFO } & (
| { type: "push"; payload: PUSH_PAYLOAD }
);
export type SERVER_DELTA = { id: string; version: number; payload: string };
export type SERVER_DELTA = {
id: string;
version: number;
// CFDO: should be type-safe
payload: Record<string, any>;
};
export type SERVER_MESSAGE =
| {
type: "relayed";

View file

@ -1,4 +1,5 @@
import AsyncLock from "async-lock";
import msgpack from "msgpack-lite";
import { Utils } from "./utils";
import type {
@ -37,10 +38,35 @@ export class ExcalidrawSyncServer {
this.sessions.delete(client);
}
public onMessage(client: WebSocket, message: string): Promise<void> | void {
public onMessage(
client: WebSocket,
message: ArrayBuffer,
): Promise<void> | void {
const [parsedMessage, parseMessageError] = Utils.try<CLIENT_MESSAGE_RAW>(
() => {
return JSON.parse(message);
const headerLength = 4;
const header = new Uint8Array(message, 0, headerLength);
const metadataLength = new DataView(
header.buffer,
header.byteOffset,
).getUint32(0);
const metadata = new Uint8Array(
message,
headerLength,
headerLength + metadataLength,
);
const payload = new Uint8Array(message, headerLength + metadataLength);
const parsed = {
...msgpack.decode(metadata),
payload,
};
// CFDO: add dev-level logging
console.log({ parsed });
return parsed;
},
);
@ -56,29 +82,7 @@ export class ExcalidrawSyncServer {
return this.processChunks(client, { type, payload, chunkInfo });
}
const [parsedPayload, parsePayloadError] = Utils.try<
CLIENT_MESSAGE["payload"]
>(() => JSON.parse(payload));
if (parsePayloadError) {
console.error(parsePayloadError);
return;
}
switch (type) {
case "relay":
return this.relay(client, parsedPayload as RELAY_PAYLOAD);
case "pull":
return this.pull(client, parsedPayload as PULL_PAYLOAD);
case "push":
// apply each one-by-one to avoid race conditions
// CFDO: in theory we do not need to block ephemeral appState changes
return this.lock.acquire("push", () =>
this.push(client, parsedPayload as PUSH_PAYLOAD),
);
default:
console.error(`Unknown message type: ${type}`);
}
return this.processMessage(client, parsedMessage);
}
/**
@ -118,16 +122,18 @@ export class ExcalidrawSyncServer {
// hopefully we can fit into the 128 MiB memory limit
const restoredPayload = Array.from(chunks)
.sort((a, b) => (a <= b ? -1 : 1))
.reduce((acc, [_, payload]) => (acc += payload), "");
.sort(([positionA], [positionB]) => (positionA <= positionB ? -1 : 1))
.reduce(
(acc, [_, payload]) => Uint8Array.from([...acc, ...payload]),
new Uint8Array(),
);
const rawMessage = JSON.stringify({
const rawMessage = {
type,
payload: restoredPayload,
} as CLIENT_MESSAGE_RAW);
};
// process the message
return this.onMessage(client, rawMessage);
return this.processMessage(client, rawMessage);
} catch (error) {
console.error(`Error while processing chunk "${id}"`, error);
} finally {
@ -138,6 +144,35 @@ export class ExcalidrawSyncServer {
}
}
private processMessage(
client: WebSocket,
{ type, payload }: Omit<CLIENT_MESSAGE_RAW, "chunkInfo">,
) {
const [parsedPayload, parsePayloadError] = Utils.try<
CLIENT_MESSAGE["payload"]
>(() => msgpack.decode(payload));
if (parsePayloadError) {
console.error(parsePayloadError);
return;
}
switch (type) {
case "relay":
return this.relay(client, parsedPayload as RELAY_PAYLOAD);
case "pull":
return this.pull(client, parsedPayload as PULL_PAYLOAD);
case "push":
// apply each one-by-one to avoid race conditions
// CFDO: in theory we do not need to block ephemeral appState changes
return this.lock.acquire("push", () =>
this.push(client, parsedPayload as PUSH_PAYLOAD),
);
default:
console.error(`Unknown message type: ${type}`);
}
}
private relay(client: WebSocket, payload: RELAY_PAYLOAD) {
// CFDO: we should likely apply these to the snapshot
return this.broadcast(
@ -205,19 +240,34 @@ export class ExcalidrawSyncServer {
}
private send(client: WebSocket, message: SERVER_MESSAGE) {
const msg = JSON.stringify(message);
client.send(msg);
const [encodedMessage, encodeError] = Utils.try<Uint8Array>(() =>
msgpack.encode(message),
);
if (encodeError) {
console.error(encodeError);
return;
}
client.send(encodedMessage);
}
private broadcast(message: SERVER_MESSAGE, exclude?: WebSocket) {
const msg = JSON.stringify(message);
const [encodedMessage, encodeError] = Utils.try<Uint8Array>(() =>
msgpack.encode(message),
);
if (encodeError) {
console.error(encodeError);
return;
}
for (const ws of this.sessions) {
if (ws === exclude) {
continue;
}
ws.send(msg);
ws.send(encodedMessage);
}
}
}