Sharding rows due to SQLite limits

This commit is contained in:
Marcel Mraz 2024-12-30 13:44:53 +01:00
parent 12be5d716b
commit f6061f5ec6
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
7 changed files with 228 additions and 194 deletions

View file

@ -6,71 +6,87 @@ import type {
// CFDO: add senderId, possibly roomId as well
export class DurableIncrementsRepository implements IncrementsRepository {
// there is a 2MB row limit, hence working max row size of 1.5 MB
// and leaving a buffer for other row metadata
private static readonly MAX_PAYLOAD_SIZE = 1_500_000;
constructor(private storage: DurableObjectStorage) {
// #region DEV ONLY
// this.storage.sql.exec(`DROP TABLE IF EXISTS increments;`);
// #endregion
// CFDO: payload has just 2MB limit, which might not be enough
this.storage.sql.exec(`CREATE TABLE IF NOT EXISTS increments(
version INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT NOT NULL UNIQUE,
id TEXT NOT NULL,
version INTEGER NOT NULL,
position INTEGER NOT NULL,
payload TEXT NOT NULL,
createdAt TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
payload TEXT
PRIMARY KEY (id, version, position)
);`);
}
public saveAll(increments: Array<CLIENT_INCREMENT>) {
public save(increment: CLIENT_INCREMENT): SERVER_INCREMENT | null {
return this.storage.transactionSync(() => {
const prevVersion = this.getLastVersion();
const acknowledged: Array<SERVER_INCREMENT> = [];
const existingIncrement = this.getById(increment.id);
// don't perist the same increment twice
if (existingIncrement) {
return existingIncrement;
}
try {
const payload = JSON.stringify(increment);
const payloadSize = new TextEncoder().encode(payload).byteLength;
const chunkVersion = this.getLastVersion() + 1;
const chunksCount = Math.ceil(
payloadSize / DurableIncrementsRepository.MAX_PAYLOAD_SIZE,
);
for (let position = 0; position < chunksCount; position++) {
const start = position * DurableIncrementsRepository.MAX_PAYLOAD_SIZE;
const end = start + DurableIncrementsRepository.MAX_PAYLOAD_SIZE;
// slicing the chunk payload
const chunkedPayload = payload.slice(start, end);
for (const increment of increments) {
try {
// unique id ensures that we don't acknowledge the same increment twice
this.storage.sql.exec(
`INSERT INTO increments (id, payload) VALUES (?, ?);`,
`INSERT INTO increments (id, version, position, payload) VALUES (?, ?, ?, ?);`,
increment.id,
JSON.stringify(increment),
chunkVersion,
position,
chunkedPayload,
);
} catch (e) {
// check if the increment has been already acknowledged
// in case client for some reason did not receive acknowledgement
// and reconnected while the we still have the increment in the worker
// otherwise the client is doomed to full a restore
if (
e instanceof Error &&
e.message.includes(
"UNIQUE constraint failed: increments.id: SQLITE_CONSTRAINT",
)
) {
acknowledged.push(this.getById(increment.id));
continue;
}
}
} catch (e) {
// check if the increment has been already acknowledged
// in case client for some reason did not receive acknowledgement
// and reconnected while the we still have the increment in the worker
// otherwise the client is doomed to full a restore
if (e instanceof Error && e.message.includes("SQLITE_CONSTRAINT")) {
// continue;
} else {
throw e;
}
}
// query the just added increments
acknowledged.push(...this.getSinceVersion(prevVersion));
const acknowledged = this.getById(increment.id);
return acknowledged;
});
}
public getSinceVersion(version: number): Array<SERVER_INCREMENT> {
// CFDO: for versioning we need deletions, but not for the "snapshot" update;
return this.storage.sql
// CFDO: for versioning we need deletions, but not for the "snapshot" update;
public getAllSinceVersion(version: number): Array<SERVER_INCREMENT> {
const increments = this.storage.sql
.exec<SERVER_INCREMENT>(
`SELECT id, payload, version FROM increments WHERE version > (?) ORDER BY version, createdAt ASC;`,
`SELECT id, payload, version FROM increments WHERE version > (?) ORDER BY version, position, createdAt ASC;`,
version,
)
.toArray();
return this.restoreServerIncrements(increments);
}
public getLastVersion(): number {
// CFDO: might be in memory to reduce number of rows read (or index on version at least, if btree affect rows read)
// CFDO: might be in memory to reduce number of rows read (or position on version at least, if btree affect rows read)
const result = this.storage.sql
.exec(`SELECT MAX(version) FROM increments;`)
.one();
@ -78,12 +94,55 @@ export class DurableIncrementsRepository implements IncrementsRepository {
return result ? Number(result["MAX(version)"]) : 0;
}
public getById(id: string): SERVER_INCREMENT {
return this.storage.sql
public getById(id: string): SERVER_INCREMENT | null {
const increments = this.storage.sql
.exec<SERVER_INCREMENT>(
`SELECT id, payload, version FROM increments WHERE id = (?)`,
`SELECT id, payload, version FROM increments WHERE id = (?) ORDER BY position ASC`,
id,
)
.one();
.toArray();
if (!increments.length) {
return null;
}
const restoredIncrements = this.restoreServerIncrements(increments);
if (restoredIncrements.length !== 1) {
throw new Error(
`Expected exactly one restored increment, but received "${restoredIncrements.length}".`,
);
}
return restoredIncrements[0];
}
private restoreServerIncrements(
increments: SERVER_INCREMENT[],
): SERVER_INCREMENT[] {
return Array.from(
increments
.reduce((acc, curr) => {
const increment = acc.get(curr.version);
if (increment) {
acc.set(curr.version, {
...increment,
// glueing the chunks payload back
payload: increment.payload + curr.payload,
});
} else {
// let's not unnecessarily expose more props than these
acc.set(curr.version, {
id: curr.id,
version: curr.version,
payload: curr.payload,
});
}
return acc;
}, new Map<number, SERVER_INCREMENT>())
.values(),
);
}
}

View file

@ -16,7 +16,6 @@ import type { SceneElementsMap } from "../element/types";
import type {
CLIENT_INCREMENT,
CLIENT_MESSAGE_RAW,
PUSH_PAYLOAD,
SERVER_INCREMENT,
} from "./protocol";
import { debounce } from "../utils";
@ -26,14 +25,10 @@ class SocketMessage implements CLIENT_MESSAGE_RAW {
constructor(
public readonly type: "relay" | "pull" | "push",
public readonly payload: string,
public readonly chunkInfo: {
public readonly chunkInfo?: {
id: string;
order: number;
position: number;
count: number;
} = {
id: "",
order: 0,
count: 1,
},
) {}
}
@ -100,7 +95,7 @@ class SocketClient {
maxRetries: Infinity, // maximum number of retries
maxEnqueuedMessages: 0, // maximum number of messages to buffer until reconnection
startClosed: false, // start websocket in CLOSED state, call `.reconnect()` to connect
debug: true, // enables debug output,
debug: false, // enables debug output,
},
);
this.socket.addEventListener("message", this.onMessage);
@ -181,13 +176,13 @@ class SocketClient {
const chunkSize = SocketClient.MAX_MESSAGE_SIZE;
const chunksCount = Math.ceil(payloadSize / chunkSize);
for (let i = 0; i < chunksCount; i++) {
const start = i * chunkSize;
for (let position = 0; position < chunksCount; position++) {
const start = position * chunkSize;
const end = start + chunkSize;
const chunkedPayload = stringifiedPayload.slice(start, end);
const message = new SocketMessage(type, chunkedPayload, {
id: chunkId,
order: i,
position,
count: chunksCount,
});
@ -289,15 +284,11 @@ export class SyncClient {
api: ExcalidrawImperativeAPI,
repository: IncrementsRepository & MetadataRepository,
) {
const [queue, metadata] = await Promise.all([
SyncQueue.create(repository),
repository.loadMetadata(),
]);
const queue = await SyncQueue.create(repository);
return new SyncClient(api, repository, queue, {
host: SyncClient.HOST_URL,
roomId: SyncClient.ROOM_ID,
lastAcknowledgedVersion: metadata?.lastAcknowledgedVersion ?? 0,
lastAcknowledgedVersion: 0,
});
}
// #endregion
@ -320,22 +311,19 @@ export class SyncClient {
});
}
public push(
type: "durable" | "ephemeral" = "durable",
...increments: Array<CLIENT_INCREMENT>
): void {
const payload: PUSH_PAYLOAD = { type, increments: [] };
if (type === "durable") {
this.queue.add(...increments);
// batch all (already) queued increments
payload.increments = this.queue.getAll();
} else {
payload.increments = increments;
public push(increment?: StoreIncrement): void {
if (increment) {
this.queue.add(increment);
}
if (payload.increments.length > 0) {
this.client.send({ type: "push", payload });
// re-send all already queued increments
for (const queuedIncrement of this.queue.getAll()) {
this.client.send({
type: "push",
payload: {
...queuedIncrement,
},
});
}
}
@ -403,12 +391,6 @@ export class SyncClient {
version,
});
// local increment shall not have to be applied again
if (this.queue.has(id)) {
this.queue.remove(id);
continue;
}
// we've already applied this increment
if (version <= nextAcknowledgedVersion) {
continue;
@ -419,7 +401,7 @@ export class SyncClient {
} else {
// it's fine to apply increments our of order,
// as they are idempontent, so that we can re-apply them again,
// as long as we don't mark them as acknowledged
// as long as we don't mark their version as acknowledged
console.debug(
`Received out of order increment, expected "${
nextAcknowledgedVersion + 1
@ -427,27 +409,32 @@ export class SyncClient {
);
}
// apply remote increment with higher version than the last acknowledged one
const remoteIncrement = StoreIncrement.load(payload);
[elements] = remoteIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
// local increment shall not have to be applied again
if (this.queue.has(id)) {
this.queue.remove(id);
} else {
// apply remote increment with higher version than the last acknowledged one
const remoteIncrement = StoreIncrement.load(payload);
[elements] = remoteIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
// apply local increments
for (const localIncrement of this.queue.getAll()) {
// CFDO: in theory only necessary when remote increments modified same element properties!
[elements] = localIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
// apply local increments
for (const localIncrement of this.queue.getAll()) {
// CFDO: in theory only necessary when remote increments modified same element properties!
[elements] = localIncrement.elementsChange.applyTo(
elements,
this.api.store.snapshot.elements,
);
}
this.api.updateScene({
elements: Array.from(elements.values()),
storeAction: "update",
});
this.api.updateScene({
elements: Array.from(elements.values()),
storeAction: "update",
});
}
this.lastAcknowledgedVersion = nextAcknowledgedVersion;
} catch (e) {

View file

@ -1,36 +1,36 @@
import type { StoreIncrement } from "../store";
import type { DTO } from "../utility-types";
export type CLIENT_INCREMENT = DTO<StoreIncrement>;
export type RELAY_PAYLOAD = { buffer: ArrayBuffer };
export type PULL_PAYLOAD = { lastAcknowledgedVersion: number };
export type PUSH_PAYLOAD = {
type: "durable" | "ephemeral";
increments: Array<CLIENT_INCREMENT>;
};
export type PUSH_PAYLOAD = CLIENT_INCREMENT;
export type CLIENT_INCREMENT = StoreIncrement;
export type CLIENT_MESSAGE_METADATA = {
export type CHUNK_INFO = {
id: string;
order: number;
position: number;
count: number;
};
export type CLIENT_MESSAGE_RAW = {
type: "relay" | "pull" | "push";
payload: string;
chunkInfo: CLIENT_MESSAGE_METADATA;
chunkInfo?: CHUNK_INFO;
};
export type CLIENT_MESSAGE =
export type CLIENT_MESSAGE = { chunkInfo: CHUNK_INFO } & (
| { type: "relay"; payload: RELAY_PAYLOAD }
| { type: "pull"; payload: PULL_PAYLOAD }
| { type: "push"; payload: PUSH_PAYLOAD };
| { type: "push"; payload: PUSH_PAYLOAD }
);
export type SERVER_INCREMENT = { id: string; version: number; payload: string };
export type SERVER_MESSAGE =
| {
type: "relayed";
payload: { increments: Array<CLIENT_INCREMENT> } | RELAY_PAYLOAD;
// CFDO: should likely be just elements
// payload: { increments: Array<CLIENT_INCREMENT> } | RELAY_PAYLOAD;
}
| { type: "acknowledged"; payload: { increments: Array<SERVER_INCREMENT> } }
| {
@ -39,8 +39,8 @@ export type SERVER_MESSAGE =
};
export interface IncrementsRepository {
saveAll(increments: Array<CLIENT_INCREMENT>): Array<SERVER_INCREMENT>;
getSinceVersion(version: number): Array<SERVER_INCREMENT>;
save(increment: CLIENT_INCREMENT): SERVER_INCREMENT | null;
getAllSinceVersion(version: number): Array<SERVER_INCREMENT>;
getLastVersion(): number;
}

View file

@ -2,8 +2,8 @@ import throttle from "lodash.throttle";
import type { StoreIncrement } from "../store";
export interface IncrementsRepository {
loadIncrements(): Promise<{ increments: Array<StoreIncrement> } | null>;
saveIncrements(params: { increments: Array<StoreIncrement> }): Promise<void>;
loadIncrements(): Promise<Array<StoreIncrement> | null>;
saveIncrements(params: StoreIncrement[]): Promise<void>;
}
export interface MetadataRepository {
@ -25,10 +25,10 @@ export class SyncQueue {
}
public static async create(repository: IncrementsRepository) {
const data = await repository.loadIncrements();
const increments = await repository.loadIncrements();
return new SyncQueue(
new Map(data?.increments?.map((increment) => [increment.id, increment])),
new Map(increments?.map((increment) => [increment.id, increment])),
repository,
);
}
@ -64,7 +64,7 @@ export class SyncQueue {
public persist = throttle(
async () => {
try {
await this.repository.saveIncrements({ increments: this.getAll() });
await this.repository.saveIncrements(this.getAll());
} catch (e) {
console.error("Failed to persist the sync queue:", e);
}
@ -72,4 +72,4 @@ export class SyncQueue {
1000,
{ leading: false, trailing: true },
);
}
}

View file

@ -3,14 +3,13 @@ import { Utils } from "./utils";
import type {
IncrementsRepository,
CLIENT_INCREMENT,
CLIENT_MESSAGE,
PULL_PAYLOAD,
PUSH_PAYLOAD,
RELAY_PAYLOAD,
SERVER_MESSAGE,
SERVER_INCREMENT,
CLIENT_MESSAGE_RAW,
CHUNK_INFO,
} from "./protocol";
// CFDO: message could be binary (cbor, protobuf, etc.)
@ -22,12 +21,13 @@ export class ExcalidrawSyncServer {
private readonly lock: AsyncLock = new AsyncLock();
private readonly sessions: Set<WebSocket> = new Set();
private readonly chunks = new Map<
CLIENT_MESSAGE_RAW["chunkInfo"]["id"],
Map<CLIENT_MESSAGE_RAW["chunkInfo"]["order"], CLIENT_MESSAGE_RAW["payload"]>
CHUNK_INFO["id"],
Map<CHUNK_INFO["position"], CLIENT_MESSAGE_RAW["payload"]>
>();
constructor(private readonly incrementsRepository: IncrementsRepository) {}
// CFDO: should send a message about collaborators (no collaborators => no need to send ephemerals)
public onConnect(client: WebSocket) {
this.sessions.add(client);
}
@ -50,9 +50,9 @@ export class ExcalidrawSyncServer {
const { type, payload, chunkInfo } = parsedMessage;
// if there are more than 1 chunks, process them first
if (chunkInfo.count > 1) {
return this.processChunks(client, parsedMessage);
// if there is chunkInfo, there are more than 1 chunks => process them first
if (chunkInfo) {
return this.processChunks(client, { type, payload, chunkInfo });
}
const [parsedPayload, parsePayloadError] = Utils.try<
@ -65,8 +65,8 @@ export class ExcalidrawSyncServer {
}
switch (type) {
case "relay":
return this.relay(client, parsedPayload as RELAY_PAYLOAD);
// case "relay":
// return this.relay(client, parsedPayload as RELAY_PAYLOAD);
case "pull":
return this.pull(client, parsedPayload as PULL_PAYLOAD);
case "push":
@ -83,12 +83,15 @@ export class ExcalidrawSyncServer {
/**
* Process chunks in case the client-side payload would overflow the 1MiB durable object WS message limit.
*/
private processChunks(client: WebSocket, message: CLIENT_MESSAGE_RAW) {
private processChunks(
client: WebSocket,
message: CLIENT_MESSAGE_RAW & { chunkInfo: CHUNK_INFO },
) {
let shouldCleanupchunks = true;
const {
type,
payload,
chunkInfo: { id, order, count },
chunkInfo: { id, position, count },
} = message;
try {
@ -104,7 +107,7 @@ export class ExcalidrawSyncServer {
}
// set the buffer by order
chunks.set(order, payload);
chunks.set(position, payload);
if (chunks.size !== count) {
// we don't have all the chunks, don't cleanup just yet!
@ -120,8 +123,6 @@ export class ExcalidrawSyncServer {
const rawMessage = JSON.stringify({
type,
payload: restoredPayload,
// id is irrelevant if we are sending just one chunk
chunkInfo: { id: "", order: 0, count: 1 },
} as CLIENT_MESSAGE_RAW);
// process the message
@ -136,18 +137,18 @@ export class ExcalidrawSyncServer {
}
}
private relay(
client: WebSocket,
payload: { increments: Array<CLIENT_INCREMENT> } | RELAY_PAYLOAD,
) {
return this.broadcast(
{
type: "relayed",
payload,
},
client,
);
}
// private relay(
// client: WebSocket,
// payload: { increments: Array<CLIENT_INCREMENT> } | RELAY_PAYLOAD,
// ) {
// return this.broadcast(
// {
// type: "relayed",
// payload,
// },
// client,
// );
// }
private pull(client: WebSocket, payload: PULL_PAYLOAD) {
// CFDO: test for invalid payload
@ -170,7 +171,7 @@ export class ExcalidrawSyncServer {
if (versionΔ > 0) {
increments.push(
...this.incrementsRepository.getSinceVersion(
...this.incrementsRepository.getAllSinceVersion(
lastAcknowledgedClientVersion,
),
);
@ -184,38 +185,29 @@ export class ExcalidrawSyncServer {
});
}
private push(client: WebSocket, payload: PUSH_PAYLOAD) {
const { type, increments } = payload;
private push(client: WebSocket, increment: PUSH_PAYLOAD) {
// CFDO: try to apply the increments to the snapshot
const [acknowledged, error] = Utils.try(() =>
this.incrementsRepository.save(increment),
);
switch (type) {
case "ephemeral":
return this.relay(client, { increments });
case "durable":
// CFDO: try to apply the increments to the snapshot
const [acknowledged, error] = Utils.try(() =>
this.incrementsRepository.saveAll(increments),
);
if (error) {
// everything should be automatically rolled-back -> double-check
return this.send(client, {
type: "rejected",
payload: {
message: error.message,
increments,
},
});
}
return this.broadcast({
type: "acknowledged",
payload: {
increments: acknowledged,
},
});
default:
console.error(`Unknown push message type: ${type}`);
if (error || !acknowledged) {
// everything should be automatically rolled-back -> double-check
return this.send(client, {
type: "rejected",
payload: {
message: error ? error.message : "Coudn't persist the increment",
increments: [increment],
},
});
}
return this.broadcast({
type: "acknowledged",
payload: {
increments: [acknowledged],
},
});
}
private send(client: WebSocket, message: SERVER_MESSAGE) {