Various sync & time travel fixes

This commit is contained in:
Marcel Mraz 2024-12-23 16:48:45 +01:00
parent 6a17541713
commit 1abb901ec2
No known key found for this signature in database
GPG key ID: 4EBD6E62DC830CD2
5 changed files with 46 additions and 37 deletions

View file

@ -1,5 +1,4 @@
/* eslint-disable no-console */
import debounce from "lodash.debounce";
import throttle from "lodash.throttle";
import ReconnectingWebSocket, {
type Event,
@ -19,11 +18,18 @@ import type {
PUSH_PAYLOAD,
SERVER_INCREMENT,
} from "./protocol";
import { debounce } from "../utils";
const NO_STATUS_RECEIVED_ERROR_CODE = 1005;
const ABNORMAL_CLOSURE_ERROR_CODE = 1006;
interface AcknowledgedIncrement {
increment: StoreIncrement;
version: number;
}
export class SyncClient {
public static readonly NORMAL_CLOSURE = 1000;
public static readonly NO_STATUS_RECEIVED_ERROR_CODE = 1005;
public static readonly ABNORMAL_CLOSURE_ERROR_CODE = 1006;
private static readonly HOST_URL = import.meta.env.DEV
? "ws://localhost:8787"
: "https://excalidraw-sync.marcel-529.workers.dev";
@ -38,11 +44,15 @@ export class SyncClient {
private readonly repository: MetadataRepository;
// CFDO: shouldn't be stateful, only request / response
private readonly acknowledgedIncrementsMap: Map<string, StoreIncrement> =
new Map();
private readonly acknowledgedIncrementsMap: Map<
string,
AcknowledgedIncrement
> = new Map();
public get acknowledgedIncrements() {
return Array.from(this.acknowledgedIncrementsMap.values());
return Array.from(this.acknowledgedIncrementsMap.values())
.sort((a, b) => (a.version < b.version ? -1 : 1))
.map((x) => x.increment);
}
private readonly roomId: string;
@ -87,7 +97,6 @@ export class SyncClient {
});
}
// CFDO I: throttle does not work that well here (after some period it tries to reconnect too often)
public connect = throttle(
() => {
if (this.server && this.server.readyState !== this.server.CLOSED) {
@ -121,7 +130,7 @@ export class SyncClient {
);
public disconnect = throttle(
(code?: number, reason?: string) => {
(code: number, reason?: string) => {
if (!this.server) {
return;
}
@ -134,7 +143,9 @@ export class SyncClient {
}
console.log(
`Disconnecting from the sync server with code "${code}" and reason "${reason}"...`,
`Disconnecting from the sync server with code "${code}"${
reason ? ` and reason "${reason}".` : "."
}`,
);
this.server.removeEventListener("message", this.onMessage);
this.server.removeEventListener("open", this.onOpen);
@ -153,7 +164,7 @@ export class SyncClient {
private onClose = (event: CloseEvent) => {
this.disconnect(
event.code || NO_STATUS_RECEIVED_ERROR_CODE,
event.code || SyncClient.NO_STATUS_RECEIVED_ERROR_CODE,
event.reason || "Connection closed without a reason",
);
};
@ -161,8 +172,8 @@ export class SyncClient {
private onError = (event: Event) => {
this.disconnect(
event.type === "error"
? ABNORMAL_CLOSURE_ERROR_CODE
: NO_STATUS_RECEIVED_ERROR_CODE,
? SyncClient.ABNORMAL_CLOSURE_ERROR_CODE
: SyncClient.NO_STATUS_RECEIVED_ERROR_CODE,
`Received "${event.type}" on the sync connection`,
);
};
@ -227,12 +238,9 @@ export class SyncClient {
});
}
// CFDO: should be flushed once regular push / pull goes through
private schedulePush = (ms: number = 1000) =>
debounce(this.push, ms, { leading: true, trailing: false });
private schedulePull = (ms: number = 1000) =>
debounce(this.pull, ms, { leading: true, trailing: false });
// CFDO: could be flushed once regular push / pull goes through
private schedulePush = debounce(() => this.push(), 1000);
private schedulePull = debounce(() => this.pull(), 1000);
// CFDO: refactor by applying all operations to store, not to the elements
private handleAcknowledged(payload: { increments: Array<SERVER_INCREMENT> }) {
@ -246,11 +254,12 @@ export class SyncClient {
const { increments: remoteIncrements } = payload;
// apply remote increments
for (const { id, version, payload } of remoteIncrements.sort((a, b) =>
a.version <= b.version ? -1 : 1,
)) {
for (const { id, version, payload } of remoteIncrements) {
// CFDO: temporary to load all increments on init
this.acknowledgedIncrementsMap.set(id, StoreIncrement.load(payload));
this.acknowledgedIncrementsMap.set(id, {
increment: StoreIncrement.load(payload),
version,
});
// local increment shall not have to be applied again
if (this.queue.has(id)) {
@ -301,11 +310,11 @@ export class SyncClient {
this.lastAcknowledgedVersion = nextAcknowledgedVersion;
} catch (e) {
console.error("Failed to apply acknowledged increments:", e);
this.schedulePull().call(this);
this.schedulePull();
return;
}
this.schedulePush().call(this);
this.schedulePush();
}
private handleRejected(payload: { ids: Array<string>; message: string }) {