mirror of
https://github.com/immich-app/immich.git
synced 2026-01-24 02:14:46 -08:00
perf - replace broadcast channel with direct postMessage
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import { ServiceWorkerMessenger } from './sw-messenger';
|
||||
|
||||
const messenger = new ServiceWorkerMessenger('immich');
|
||||
const messenger = new ServiceWorkerMessenger();
|
||||
|
||||
let isServiceWorkerEnabled = true;
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/**
|
||||
* Low-level protocol for communicating with the service worker via BroadcastChannel.
|
||||
* Low-level protocol for communicating with the service worker via postMessage.
|
||||
*
|
||||
* Protocol:
|
||||
* 1. Main thread sends request: { type: string, requestId: string, ...data }
|
||||
@@ -16,19 +16,22 @@ interface PendingRequest {
|
||||
}
|
||||
|
||||
export class ServiceWorkerMessenger {
|
||||
readonly #broadcast: BroadcastChannel;
|
||||
readonly #pendingRequests = new Map<string, PendingRequest>();
|
||||
readonly #ackTimeoutMs: number;
|
||||
#requestCounter = 0;
|
||||
#onTimeout?: (type: string, data: Record<string, unknown>) => void;
|
||||
#messageHandler?: (event: MessageEvent) => void;
|
||||
|
||||
constructor(channelName: string, ackTimeoutMs = 5000) {
|
||||
this.#broadcast = new BroadcastChannel(channelName);
|
||||
constructor(ackTimeoutMs = 5000) {
|
||||
this.#ackTimeoutMs = ackTimeoutMs;
|
||||
|
||||
this.#broadcast.addEventListener('message', (event) => {
|
||||
this.#handleMessage(event.data);
|
||||
});
|
||||
// Listen for messages from the service worker
|
||||
if ('serviceWorker' in navigator) {
|
||||
this.#messageHandler = (event) => {
|
||||
this.#handleMessage(event.data);
|
||||
};
|
||||
navigator.serviceWorker.addEventListener('message', this.#messageHandler);
|
||||
}
|
||||
}
|
||||
|
||||
#handleMessage(data: unknown) {
|
||||
@@ -94,7 +97,7 @@ export class ServiceWorkerMessenger {
|
||||
if (waitForResponse) {
|
||||
reject(new Error(`Service worker did not acknowledge ${type} request`));
|
||||
} else {
|
||||
pending.resolveAck();
|
||||
resolve(undefined as T);
|
||||
}
|
||||
}
|
||||
}, this.#ackTimeoutMs);
|
||||
@@ -107,7 +110,10 @@ export class ServiceWorkerMessenger {
|
||||
ackReceived: false,
|
||||
});
|
||||
|
||||
this.#broadcast.postMessage({
|
||||
// Send message to the active service worker
|
||||
// Feature detection is done in constructor and at call sites (sw-messaging.ts:isValidSwContext)
|
||||
// eslint-disable-next-line compat/compat
|
||||
navigator.serviceWorker.controller?.postMessage({
|
||||
type,
|
||||
requestId,
|
||||
...data,
|
||||
@@ -120,7 +126,7 @@ export class ServiceWorkerMessenger {
|
||||
/**
|
||||
* Send a one-way message to the service worker.
|
||||
* Returns a promise that resolves after the service worker acknowledges receipt.
|
||||
* Rejects if no ack is received within the timeout period.
|
||||
* Resolves even if no ack is received within the timeout period.
|
||||
*/
|
||||
send(type: string, data: Record<string, unknown>): Promise<void> {
|
||||
return this.#sendInternal<void>(type, data, false);
|
||||
@@ -135,9 +141,17 @@ export class ServiceWorkerMessenger {
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the broadcast channel
|
||||
* Clean up pending requests and remove event listener
|
||||
*/
|
||||
close(): void {
|
||||
this.#broadcast.close();
|
||||
for (const pending of this.#pendingRequests.values()) {
|
||||
clearTimeout(pending.ackTimeout);
|
||||
}
|
||||
this.#pendingRequests.clear();
|
||||
|
||||
if (this.#messageHandler && 'serviceWorker' in navigator) {
|
||||
navigator.serviceWorker.removeEventListener('message', this.#messageHandler);
|
||||
this.#messageHandler = undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,42 +0,0 @@
|
||||
import { handleCancel } from './request';
|
||||
|
||||
/**
|
||||
* Send acknowledgment for a request
|
||||
*/
|
||||
function sendAck(broadcast: BroadcastChannel, requestId: string) {
|
||||
broadcast.postMessage({
|
||||
type: 'ack',
|
||||
requestId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle 'cancel' request: cancel a pending request
|
||||
*/
|
||||
const handleCancelRequest = (broadcast: BroadcastChannel, url: URL, requestId: string) => {
|
||||
sendAck(broadcast, requestId);
|
||||
handleCancel(url);
|
||||
};
|
||||
|
||||
export const installBroadcastChannelListener = () => {
|
||||
const broadcast = new BroadcastChannel('immich');
|
||||
// eslint-disable-next-line unicorn/prefer-add-event-listener
|
||||
broadcast.onmessage = (event) => {
|
||||
if (!event.data?.requestId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const requestId = event.data.requestId;
|
||||
const url = event.data.url ? new URL(event.data.url, self.location.origin) : undefined;
|
||||
if (!url) {
|
||||
return;
|
||||
}
|
||||
|
||||
switch (event.data.type) {
|
||||
case 'cancel': {
|
||||
handleCancelRequest(broadcast, url, requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
@@ -2,7 +2,8 @@
|
||||
/// <reference no-default-lib="true"/>
|
||||
/// <reference lib="esnext" />
|
||||
/// <reference lib="webworker" />
|
||||
import { installBroadcastChannelListener } from './broadcast-channel';
|
||||
|
||||
import { installMessageListener } from './messaging';
|
||||
import { handleFetch as handleAssetFetch } from './request';
|
||||
|
||||
const ASSET_REQUEST_REGEX = /^\/api\/assets\/[a-f0-9-]+\/(original|thumbnail)/;
|
||||
@@ -33,4 +34,4 @@ const handleFetch = (event: FetchEvent): void => {
|
||||
sw.addEventListener('install', handleInstall, { passive: true });
|
||||
sw.addEventListener('activate', handleActivate, { passive: true });
|
||||
sw.addEventListener('fetch', handleFetch, { passive: true });
|
||||
installBroadcastChannelListener();
|
||||
installMessageListener();
|
||||
|
||||
53
web/src/service-worker/messaging.ts
Normal file
53
web/src/service-worker/messaging.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
/// <reference types="@sveltejs/kit" />
|
||||
/// <reference no-default-lib="true"/>
|
||||
/// <reference lib="esnext" />
|
||||
/// <reference lib="webworker" />
|
||||
|
||||
import { handleCancel } from './request';
|
||||
|
||||
const sw = globalThis as unknown as ServiceWorkerGlobalScope;
|
||||
|
||||
/**
|
||||
* Send acknowledgment for a request
|
||||
*/
|
||||
function sendAck(client: Client, requestId: string) {
|
||||
client.postMessage({
|
||||
type: 'ack',
|
||||
requestId,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle 'cancel' request: cancel a pending request
|
||||
*/
|
||||
const handleCancelRequest = (client: Client, url: URL, requestId: string) => {
|
||||
sendAck(client, requestId);
|
||||
handleCancel(url);
|
||||
};
|
||||
|
||||
export const installMessageListener = () => {
|
||||
sw.addEventListener('message', (event) => {
|
||||
if (!event.data?.requestId || !event.data?.type) {
|
||||
return;
|
||||
}
|
||||
|
||||
const requestId = event.data.requestId;
|
||||
|
||||
switch (event.data.type) {
|
||||
case 'cancel': {
|
||||
const url = event.data.url ? new URL(event.data.url, self.location.origin) : undefined;
|
||||
if (!url) {
|
||||
return;
|
||||
}
|
||||
|
||||
const client = event.source;
|
||||
if (!client) {
|
||||
return;
|
||||
}
|
||||
|
||||
handleCancelRequest(client, url, requestId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
};
|
||||
@@ -1,3 +1,8 @@
|
||||
/// <reference types="@sveltejs/kit" />
|
||||
/// <reference no-default-lib="true"/>
|
||||
/// <reference lib="esnext" />
|
||||
/// <reference lib="webworker" />
|
||||
|
||||
type PendingRequest = {
|
||||
controller: AbortController;
|
||||
promise: Promise<Response>;
|
||||
@@ -8,7 +13,7 @@ const pendingRequests = new Map<string, PendingRequest>();
|
||||
|
||||
const getRequestKey = (request: URL | Request): string => (request instanceof URL ? request.href : request.url);
|
||||
|
||||
const CANCELED_MESSAGE = 'Canceled - this is normal';
|
||||
const CANCELATION_MESSAGE = 'Request canceled by application';
|
||||
const CLEANUP_TIMEOUT_MS = 5 * 60 * 1000; // 5 minutes
|
||||
|
||||
export const handleFetch = (request: URL | Request): Promise<Response> => {
|
||||
@@ -16,16 +21,22 @@ export const handleFetch = (request: URL | Request): Promise<Response> => {
|
||||
const existing = pendingRequests.get(requestKey);
|
||||
|
||||
if (existing) {
|
||||
// Clone the response from the shared promise to avoid "Response is disturbed or locked" errors
|
||||
// Clone the response since response bodies can only be read once
|
||||
// Each caller gets an independent clone they can consume
|
||||
return existing.promise.then((response) => response.clone());
|
||||
}
|
||||
|
||||
const controller = new AbortController();
|
||||
const pendingRequest: PendingRequest = {
|
||||
controller: new AbortController(),
|
||||
promise: undefined as unknown as Promise<Response>,
|
||||
};
|
||||
pendingRequests.set(requestKey, pendingRequest);
|
||||
|
||||
// NOTE: fetch returns after headers received, not the body
|
||||
const promise = fetch(request, { signal: controller.signal })
|
||||
pendingRequest.promise = fetch(request, { signal: pendingRequest.controller.signal })
|
||||
.catch((error: unknown) => {
|
||||
const standardError = error instanceof Error ? error : new Error(String(error));
|
||||
if (standardError.name === 'AbortError' || standardError.message === CANCELED_MESSAGE) {
|
||||
if (standardError.name === 'AbortError' || standardError.message === CANCELATION_MESSAGE) {
|
||||
// dummy response avoids network errors in the console for these requests
|
||||
return new Response(undefined, { status: 204 });
|
||||
}
|
||||
@@ -36,20 +47,11 @@ export const handleFetch = (request: URL | Request): Promise<Response> => {
|
||||
const cleanupTimeout = setTimeout(() => {
|
||||
pendingRequests.delete(requestKey);
|
||||
}, CLEANUP_TIMEOUT_MS);
|
||||
|
||||
const pendingRequest = pendingRequests.get(requestKey);
|
||||
if (pendingRequest) {
|
||||
pendingRequest.cleanupTimeout = cleanupTimeout;
|
||||
}
|
||||
pendingRequest.cleanupTimeout = cleanupTimeout;
|
||||
});
|
||||
|
||||
pendingRequests.set(requestKey, {
|
||||
controller,
|
||||
promise,
|
||||
});
|
||||
|
||||
// Clone for the first caller, so the promise retains the unconsumed original response for future callers
|
||||
return promise.then((response) => response.clone());
|
||||
// Clone for the first caller to keep the original response unconsumed for future callers
|
||||
return pendingRequest.promise.then((response) => response.clone());
|
||||
};
|
||||
|
||||
export const handleCancel = (url: URL) => {
|
||||
@@ -57,11 +59,10 @@ export const handleCancel = (url: URL) => {
|
||||
|
||||
const pendingRequest = pendingRequests.get(requestKey);
|
||||
if (pendingRequest) {
|
||||
pendingRequest.controller.abort(CANCELED_MESSAGE);
|
||||
pendingRequest.controller.abort(CANCELATION_MESSAGE);
|
||||
if (pendingRequest.cleanupTimeout) {
|
||||
clearTimeout(pendingRequest.cleanupTimeout);
|
||||
}
|
||||
pendingRequests.delete(requestKey);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user