Reusing existing workers infrastructure (fallback to the main thread, type-safety)

This commit is contained in:
Marcel Mraz 2025-04-02 00:29:57 +01:00
parent 7f54214ac9
commit a30d73a5cd
10 changed files with 237 additions and 142 deletions

View file

@ -29,10 +29,12 @@ import { type AnimationFrameHandler } from "../animation-frame-handler";
import { AnimatedTrail } from "../animated-trail"; import { AnimatedTrail } from "../animated-trail";
import { LassoWorkerPolyfill } from "./lasso-worker-polyfill"; import {
getLassoSelectedElementIds,
type LassoWorkerInput,
} from "./lasso-main";
import type App from "../components/App"; import type App from "../components/App";
import type { LassoWorkerInput, LassoWorkerOutput } from "./types";
export class LassoTrail extends AnimatedTrail { export class LassoTrail extends AnimatedTrail {
private intersectedElements: Set<ExcalidrawElement["id"]> = new Set(); private intersectedElements: Set<ExcalidrawElement["id"]> = new Set();
@ -41,8 +43,6 @@ export class LassoTrail extends AnimatedTrail {
null; null;
private keepPreviousSelection: boolean = false; private keepPreviousSelection: boolean = false;
private worker: Worker | LassoWorkerPolyfill | null = null;
constructor(animationFrameHandler: AnimationFrameHandler, app: App) { constructor(animationFrameHandler: AnimationFrameHandler, app: App) {
super(animationFrameHandler, app, { super(animationFrameHandler, app, {
animateTrail: true, animateTrail: true,
@ -82,29 +82,6 @@ export class LassoTrail extends AnimatedTrail {
selectedLinearElement: null, selectedLinearElement: null,
}); });
} }
if (!this.worker) {
try {
const { WorkerUrl } = await import("./lasso-worker.chunk");
if (typeof Worker !== "undefined" && WorkerUrl) {
this.worker = new Worker(WorkerUrl, { type: "module" });
} else {
this.worker = new LassoWorkerPolyfill();
}
this.worker.onmessage = (event: MessageEvent<LassoWorkerOutput>) => {
const { selectedElementIds } = event.data;
this.selectElementsFromIds(selectedElementIds);
};
this.worker.onerror = (error) => {
console.error("Worker error:", error);
};
} catch (error) {
console.error("Failed to start worker", error);
}
}
} }
selectElementsFromIds = (ids: string[]) => { selectElementsFromIds = (ids: string[]) => {
@ -191,7 +168,7 @@ export class LassoTrail extends AnimatedTrail {
this.updateSelection(); this.updateSelection();
}; };
private updateSelection = () => { private updateSelection = async () => {
const lassoPath = super const lassoPath = super
.getCurrentTrail() .getCurrentTrail()
?.originalPoints?.map((p) => pointFrom<GlobalPoint>(p[0], p[1])); ?.originalPoints?.map((p) => pointFrom<GlobalPoint>(p[0], p[1]));
@ -206,7 +183,8 @@ export class LassoTrail extends AnimatedTrail {
} }
if (lassoPath) { if (lassoPath) {
const message: LassoWorkerInput = { // need to omit command, otherwise "shared" chunk will be included in the main bundle by default
const message: Omit<LassoWorkerInput, "command"> = {
lassoPath, lassoPath,
elements: this.app.visibleElements, elements: this.app.visibleElements,
elementsSegments: this.elementsSegments, elementsSegments: this.elementsSegments,
@ -215,7 +193,9 @@ export class LassoTrail extends AnimatedTrail {
simplifyDistance: 5 / this.app.state.zoom.value, simplifyDistance: 5 / this.app.state.zoom.value,
}; };
this.worker?.postMessage(message); const { selectedElementIds } = await getLassoSelectedElementIds(message);
this.selectElementsFromIds(selectedElementIds);
} }
}; };

View file

@ -0,0 +1,123 @@
import { promiseTry } from "@excalidraw/common";
import type { ExcalidrawElement } from "@excalidraw/element/types";
import type { GlobalPoint } from "@excalidraw/math";
import { WorkerPool } from "../workers";
import type { Commands, ElementsSegmentsMap } from "./lasso-shared.chunk";
let shouldUseWorkers = typeof Worker !== "undefined";
/**
* Tries to get the selected element with a worker, if it fails, it fallbacks to the main thread.
*
* @param input - The input data for the lasso selection.
* @returns The selected element ids.
*/
export const getLassoSelectedElementIds = async (
input: Omit<LassoWorkerInput, "command">,
): Promise<
LassoWorkerOutput<typeof Commands.GET_LASSO_SELECTED_ELEMENT_IDS>
> => {
const { Commands, getLassoSelectedElementIds } = await lazyLoadLassoSharedChunk();
const inputWithCommand: LassoWorkerInput = {
...input,
command: Commands.GET_LASSO_SELECTED_ELEMENT_IDS,
};
if (!shouldUseWorkers) {
return getLassoSelectedElementIds(inputWithCommand);
}
return promiseTry(async () => {
try {
const workerPool = await getOrCreateWorkerPool();
const result = await workerPool.postMessage(inputWithCommand, {});
return result;
} catch (e) {
// don't use workers if they are failing
shouldUseWorkers = false;
// eslint-disable-next-line no-console
console.error(
"Failed to use workers for lasso selection, falling back to the main thread.",
e,
);
// fallback to the main thread
return getLassoSelectedElementIds(inputWithCommand);
}
});
};
// lazy-loaded and cached chunks
let lassoWorker: Promise<typeof import("./lasso-worker.chunk")> | null = null;
let lassoShared: Promise<typeof import("./lasso-shared.chunk")> | null = null;
export const lazyLoadLassoWorkerChunk = async () => {
if (!lassoWorker) {
lassoWorker = import("./lasso-worker.chunk");
}
return lassoWorker;
};
export const lazyLoadLassoSharedChunk = async () => {
if (!lassoShared) {
lassoShared = import("./lasso-shared.chunk");
}
return lassoShared;
};
export type LassoWorkerInput = {
command: typeof Commands.GET_LASSO_SELECTED_ELEMENT_IDS;
lassoPath: GlobalPoint[];
elements: readonly ExcalidrawElement[];
elementsSegments: ElementsSegmentsMap;
intersectedElements: Set<ExcalidrawElement["id"]>;
enclosedElements: Set<ExcalidrawElement["id"]>;
simplifyDistance?: number;
};
export type LassoWorkerOutput<T extends LassoWorkerInput["command"]> =
T extends typeof Commands.GET_LASSO_SELECTED_ELEMENT_IDS
? {
selectedElementIds: string[];
}
: never;
let workerPool: Promise<
WorkerPool<LassoWorkerInput, LassoWorkerOutput<LassoWorkerInput["command"]>>
> | null = null;
/**
* Lazy initialize or get the worker pool singleton.
*
* @throws implicitly if anything goes wrong
*/
const getOrCreateWorkerPool = () => {
if (!workerPool) {
// immediate concurrent-friendly return, to ensure we have only one pool instance
workerPool = promiseTry(async () => {
const { WorkerUrl } = await lazyLoadLassoWorkerChunk();
const pool = WorkerPool.create<
LassoWorkerInput,
LassoWorkerOutput<LassoWorkerInput["command"]>
>(WorkerUrl, {
// limit the pool size to a single active worker
maxPoolSize: 1,
});
return pool;
});
}
return workerPool;
};

View file

@ -9,13 +9,20 @@ import type {
} from "@excalidraw/math/types"; } from "@excalidraw/math/types";
import type { ExcalidrawElement } from "@excalidraw/element/types"; import type { ExcalidrawElement } from "@excalidraw/element/types";
import type { import type { LassoWorkerInput, LassoWorkerOutput } from "./lasso-main";
ElementsSegmentsMap,
LassoWorkerInput,
LassoWorkerOutput,
} from "./types";
export const updateSelection = (input: LassoWorkerInput): LassoWorkerOutput => { export type ElementsSegmentsMap = Map<string, LineSegment<GlobalPoint>[]>;
/**
* Shared commands between the main thread and worker threads.
*/
export const Commands = {
GET_LASSO_SELECTED_ELEMENT_IDS: "GET_LASSO_SELECTED_ELEMENT_IDS",
} as const;
export const getLassoSelectedElementIds = (
input: LassoWorkerInput,
): LassoWorkerOutput<typeof Commands.GET_LASSO_SELECTED_ELEMENT_IDS> => {
const { const {
lassoPath, lassoPath,
elements, elements,

View file

@ -1,28 +0,0 @@
import { updateSelection } from "./utils";
import type { LassoWorkerInput, LassoWorkerOutput } from "./types";
export class LassoWorkerPolyfill {
public onmessage: ((event: MessageEvent<LassoWorkerOutput>) => void) | null =
null;
public onerror: ((event: ErrorEvent) => void) | null = null;
postMessage(data: LassoWorkerInput) {
try {
// run asynchronously to simulate a real worker
setTimeout(() => {
const selectedElementIds = updateSelection(data);
const messageEvent = {
data: selectedElementIds,
} as MessageEvent<LassoWorkerOutput>;
this.onmessage?.(messageEvent);
}, 0);
} catch (error) {
this.onerror?.(new ErrorEvent("error", { error }));
}
}
terminate() {
// no-op for polyfill
}
}

View file

@ -1,7 +1,13 @@
import { updateSelection } from "./utils"; import { Commands, getLassoSelectedElementIds } from "./lasso-shared.chunk";
import type { LassoWorkerInput } from "./types"; import type { LassoWorkerInput } from "./lasso-main";
/**
* Due to this export (and related dynamic import), this worker code will be included in the bundle automatically (as a separate chunk),
* without the need for esbuild / vite /rollup plugins and special browser / server treatment.
*
* `import.meta.url` is undefined in nodejs
*/
export const WorkerUrl: URL | undefined = import.meta.url export const WorkerUrl: URL | undefined = import.meta.url
? new URL(import.meta.url) ? new URL(import.meta.url)
: undefined; : undefined;
@ -11,21 +17,24 @@ export const WorkerUrl: URL | undefined = import.meta.url
let isProcessing: boolean = false; let isProcessing: boolean = false;
let latestInputData: LassoWorkerInput | null = null; let latestInputData: LassoWorkerInput | null = null;
self.onmessage = (event: MessageEvent<LassoWorkerInput>) => { // run only in the worker context
if (!event.data) { if (typeof window === "undefined" && typeof self !== "undefined") {
self.postMessage({ self.onmessage = (event: MessageEvent<LassoWorkerInput>) => {
error: "No data received", if (!event.data) {
selectedElementIds: [], self.postMessage({
}); error: "No data received",
return; selectedElementIds: [],
} });
return;
}
latestInputData = event.data; latestInputData = event.data;
if (!isProcessing) { if (!isProcessing) {
processInputData(); processInputData();
} }
}; };
}
// function to process the latest data // function to process the latest data
const processInputData = () => { const processInputData = () => {
@ -40,29 +49,12 @@ const processInputData = () => {
isProcessing = true; isProcessing = true;
try { try {
const { lassoPath, elements, intersectedElements, enclosedElements } = switch (dataToProcess.command) {
dataToProcess; case Commands.GET_LASSO_SELECTED_ELEMENT_IDS:
const result = getLassoSelectedElementIds(dataToProcess);
if (!Array.isArray(lassoPath) || !Array.isArray(elements)) { self.postMessage(result);
throw new Error("Invalid input: lassoPath and elements must be arrays"); break;
} }
if (
!(intersectedElements instanceof Set) ||
!(enclosedElements instanceof Set)
) {
throw new Error(
"Invalid input: intersectedElements and enclosedElements must be Sets",
);
}
const result = updateSelection(dataToProcess);
self.postMessage(result);
} catch (error) {
self.postMessage({
error: error instanceof Error ? error.message : "Unknown error occurred",
selectedElementIds: [],
});
} finally { } finally {
isProcessing = false; isProcessing = false;
// if new data arrived during processing, process it // if new data arrived during processing, process it

View file

@ -1,17 +0,0 @@
import type { GlobalPoint, LineSegment } from "@excalidraw/math/types";
import type { ExcalidrawElement } from "@excalidraw/element/types";
export type ElementsSegmentsMap = Map<string, LineSegment<GlobalPoint>[]>;
export type LassoWorkerInput = {
lassoPath: GlobalPoint[];
elements: readonly ExcalidrawElement[];
elementsSegments: ElementsSegmentsMap;
intersectedElements: Set<ExcalidrawElement["id"]>;
enclosedElements: Set<ExcalidrawElement["id"]>;
simplifyDistance?: number;
};
export type LassoWorkerOutput = {
selectedElementIds: string[];
};

View file

@ -23,7 +23,7 @@ export const subsetWoff2GlyphsByCodepoints = async (
codePoints: Array<number>, codePoints: Array<number>,
): Promise<string> => { ): Promise<string> => {
const { Commands, subsetToBase64, toBase64 } = const { Commands, subsetToBase64, toBase64 } =
await lazyLoadSharedSubsetChunk(); await lazyLoadSubsetSharedChunk();
if (!shouldUseWorkers) { if (!shouldUseWorkers) {
return subsetToBase64(arrayBuffer, codePoints); return subsetToBase64(arrayBuffer, codePoints);
@ -75,7 +75,7 @@ export const subsetWoff2GlyphsByCodepoints = async (
let subsetWorker: Promise<typeof import("./subset-worker.chunk")> | null = null; let subsetWorker: Promise<typeof import("./subset-worker.chunk")> | null = null;
let subsetShared: Promise<typeof import("./subset-shared.chunk")> | null = null; let subsetShared: Promise<typeof import("./subset-shared.chunk")> | null = null;
const lazyLoadWorkerSubsetChunk = async () => { const lazyLoadSubsetWorkerChunk = async () => {
if (!subsetWorker) { if (!subsetWorker) {
subsetWorker = import("./subset-worker.chunk"); subsetWorker = import("./subset-worker.chunk");
} }
@ -83,7 +83,7 @@ const lazyLoadWorkerSubsetChunk = async () => {
return subsetWorker; return subsetWorker;
}; };
const lazyLoadSharedSubsetChunk = async () => { const lazyLoadSubsetSharedChunk = async () => {
if (!subsetShared) { if (!subsetShared) {
// load dynamically to force create a shared chunk reused between main thread and the worker thread // load dynamically to force create a shared chunk reused between main thread and the worker thread
subsetShared = import("./subset-shared.chunk"); subsetShared = import("./subset-shared.chunk");
@ -93,17 +93,20 @@ const lazyLoadSharedSubsetChunk = async () => {
}; };
// could be extended with multiple commands in the future // could be extended with multiple commands in the future
type SubsetWorkerData = { export type SubsetWorkerInput = {
command: typeof Commands.Subset; command: typeof Commands.Subset;
arrayBuffer: ArrayBuffer; arrayBuffer: ArrayBuffer;
codePoints: Array<number>; codePoints: Array<number>;
}; };
type SubsetWorkerResult<T extends SubsetWorkerData["command"]> = export type SubsetWorkerOutput<T extends SubsetWorkerInput["command"]> =
T extends typeof Commands.Subset ? ArrayBuffer : never; T extends typeof Commands.Subset ? ArrayBuffer : never;
let workerPool: Promise< let workerPool: Promise<
WorkerPool<SubsetWorkerData, SubsetWorkerResult<SubsetWorkerData["command"]>> WorkerPool<
SubsetWorkerInput,
SubsetWorkerOutput<SubsetWorkerInput["command"]>
>
> | null = null; > | null = null;
/** /**
@ -115,11 +118,11 @@ const getOrCreateWorkerPool = () => {
if (!workerPool) { if (!workerPool) {
// immediate concurrent-friendly return, to ensure we have only one pool instance // immediate concurrent-friendly return, to ensure we have only one pool instance
workerPool = promiseTry(async () => { workerPool = promiseTry(async () => {
const { WorkerUrl } = await lazyLoadWorkerSubsetChunk(); const { WorkerUrl } = await lazyLoadSubsetWorkerChunk();
const pool = WorkerPool.create< const pool = WorkerPool.create<
SubsetWorkerData, SubsetWorkerInput,
SubsetWorkerResult<SubsetWorkerData["command"]> SubsetWorkerOutput<SubsetWorkerInput["command"]>
>(WorkerUrl); >(WorkerUrl);
return pool; return pool;

View file

@ -9,6 +9,8 @@
import { Commands, subsetToBinary } from "./subset-shared.chunk"; import { Commands, subsetToBinary } from "./subset-shared.chunk";
import type { SubsetWorkerInput } from "./subset-main";
/** /**
* Due to this export (and related dynamic import), this worker code will be included in the bundle automatically (as a separate chunk), * Due to this export (and related dynamic import), this worker code will be included in the bundle automatically (as a separate chunk),
* without the need for esbuild / vite /rollup plugins and special browser / server treatment. * without the need for esbuild / vite /rollup plugins and special browser / server treatment.
@ -21,13 +23,7 @@ export const WorkerUrl: URL | undefined = import.meta.url
// run only in the worker context // run only in the worker context
if (typeof window === "undefined" && typeof self !== "undefined") { if (typeof window === "undefined" && typeof self !== "undefined") {
self.onmessage = async (e: { self.onmessage = async (e: MessageEvent<SubsetWorkerInput>) => {
data: {
command: typeof Commands.Subset;
arrayBuffer: ArrayBuffer;
codePoints: Array<number>;
};
}) => {
switch (e.data.command) { switch (e.data.command) {
case Commands.Subset: case Commands.Subset:
const buffer = await subsetToBinary( const buffer = await subsetToBinary(

View file

@ -25,14 +25,18 @@ import { getElementLineSegments } from "@excalidraw/element/bounds";
import type { ExcalidrawElement } from "@excalidraw/element/types"; import type { ExcalidrawElement } from "@excalidraw/element/types";
import { act, render } from "../tests/test-utils";
import { Excalidraw } from "../index"; import { Excalidraw } from "../index";
import { getSelectedElements } from "../scene"; import { getSelectedElements } from "../scene";
import { updateSelection } from "./utils"; import {
Commands,
getLassoSelectedElementIds,
} from "../lasso/lasso-shared.chunk";
import type { ElementsSegmentsMap } from "./types"; import { act, render } from "./test-utils";
import type { ElementsSegmentsMap } from "../lasso/lasso-shared.chunk";
const { h } = window; const { h } = window;
@ -63,7 +67,8 @@ const updatePath = (startPoint: GlobalPoint, points: LocalPoint[]) => {
elementsSegments.set(element.id, segments); elementsSegments.set(element.id, segments);
} }
const result = updateSelection({ const result = getLassoSelectedElementIds({
command: Commands.GET_LASSO_SELECTED_ELEMENT_IDS,
lassoPath: lassoPath:
h.app.lassoTrail h.app.lassoTrail
.getCurrentTrail() .getCurrentTrail()

View file

@ -16,24 +16,28 @@ class IdleWorker {
} }
/** /**
* Pool of idle short-lived workers. * Pool of idle short-lived workers, so that they can be reused in a short period of time (`ttl`), instead of having to create a new worker from scratch.
*
* IMPORTANT: for simplicity it does not limit the number of newly created workers, leaving it up to the caller to manage the pool size.
*/ */
export class WorkerPool<T, R> { export class WorkerPool<T, R> {
private idleWorkers: Set<IdleWorker> = new Set(); private idleWorkers: Set<IdleWorker> = new Set();
private activeWorkers: Set<IdleWorker> = new Set();
private readonly workerUrl: URL; private readonly workerUrl: URL;
private readonly workerTTL: number; private readonly workerTTL: number;
private readonly maxPoolSize: number;
private constructor( private constructor(
workerUrl: URL, workerUrl: URL,
options: { options: {
ttl?: number; ttl?: number;
maxPoolSize?: number;
}, },
) { ) {
this.workerUrl = workerUrl; this.workerUrl = workerUrl;
// by default, active & idle workers will be terminated after 1s of inactivity // by default, active & idle workers will be terminated after 1s of inactivity
this.workerTTL = options.ttl || 1000; this.workerTTL = options.ttl || 1000;
// by default, active workers are limited to 3 instances
this.maxPoolSize = options.maxPoolSize || 3;
} }
/** /**
@ -48,6 +52,7 @@ export class WorkerPool<T, R> {
workerUrl: URL | undefined, workerUrl: URL | undefined,
options: { options: {
ttl?: number; ttl?: number;
maxPoolSize?: number;
} = {}, } = {},
): WorkerPool<T, R> { ): WorkerPool<T, R> {
if (!workerUrl) { if (!workerUrl) {
@ -72,13 +77,18 @@ export class WorkerPool<T, R> {
let worker: IdleWorker; let worker: IdleWorker;
const idleWorker = Array.from(this.idleWorkers).shift(); const idleWorker = Array.from(this.idleWorkers).shift();
if (idleWorker) { if (idleWorker) {
this.idleWorkers.delete(idleWorker); this.idleWorkers.delete(idleWorker);
worker = idleWorker; worker = idleWorker;
} else { } else if (this.activeWorkers.size < this.maxPoolSize) {
worker = await this.createWorker(); worker = await this.createWorker();
} else {
worker = await this.waitForActiveWorker();
} }
this.activeWorkers.add(worker);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
worker.instance.onmessage = this.onMessageHandler(worker, resolve); worker.instance.onmessage = this.onMessageHandler(worker, resolve);
worker.instance.onerror = this.onErrorHandler(worker, reject); worker.instance.onerror = this.onErrorHandler(worker, reject);
@ -101,7 +111,13 @@ export class WorkerPool<T, R> {
worker.instance.terminate(); worker.instance.terminate();
} }
for (const worker of this.activeWorkers) {
worker.debounceTerminate.cancel();
worker.instance.terminate();
}
this.idleWorkers.clear(); this.idleWorkers.clear();
this.activeWorkers.clear();
} }
/** /**
@ -130,9 +146,25 @@ export class WorkerPool<T, R> {
return worker; return worker;
} }
private waitForActiveWorker(): Promise<IdleWorker> {
return Promise.race(
Array.from(this.activeWorkers).map(
(worker) =>
new Promise<IdleWorker>((resolve) => {
const originalOnMessage = worker.instance.onmessage;
worker.instance.onmessage = (e) => {
worker.instance.onmessage = originalOnMessage;
resolve(worker);
};
}),
),
);
}
private onMessageHandler(worker: IdleWorker, resolve: (value: R) => void) { private onMessageHandler(worker: IdleWorker, resolve: (value: R) => void) {
return (e: { data: R }) => { return (e: { data: R }) => {
worker.debounceTerminate(); worker.debounceTerminate();
this.activeWorkers.delete(worker);
this.idleWorkers.add(worker); this.idleWorkers.add(worker);
resolve(e.data); resolve(e.data);
}; };
@ -143,6 +175,8 @@ export class WorkerPool<T, R> {
reject: (reason: ErrorEvent) => void, reject: (reason: ErrorEvent) => void,
) { ) {
return (e: ErrorEvent) => { return (e: ErrorEvent) => {
this.activeWorkers.delete(worker);
// terminate the worker immediately before rejection // terminate the worker immediately before rejection
worker.debounceTerminate(() => reject(e)); worker.debounceTerminate(() => reject(e));
worker.debounceTerminate.flush(); worker.debounceTerminate.flush();