diff --git a/README.md b/README.md index 9e02936b..bb5a4853 100755 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ FILES_REMOVE_GPS_METADATA=false DATASOURCE_TYPE='local' DATASOURCE_LOCAL_DIRECTORY='./uploads' -SCHEDULER_METRICS_INTERVAL=1min +TASKS_METRICS_INTERVAL=1min FILES_MAX_FILE_SIZE=100mb diff --git a/src/lib/config/read.ts b/src/lib/config/read.ts index a7fa9b43..21733892 100755 --- a/src/lib/config/read.ts +++ b/src/lib/config/read.ts @@ -20,7 +20,7 @@ export const rawConfig: any = { size: undefined, enabled: undefined, }, - scheduler: { + tasks: { deleteInterval: undefined, clearInvitesInterval: undefined, maxViewsInterval: undefined, @@ -136,11 +136,11 @@ export const PROP_TO_ENV = { 'chunks.size': 'CHUNKS_SIZE', 'chunks.enabled': 'CHUNKS_ENABLED', - 'scheduler.deleteInterval': 'SCHEDULER_DELETE_INTERVAL', - 'scheduler.clearInvitesInterval': 'SCHEDULER_CLEAR_INVITES_INTERVAL', - 'scheduler.maxViewsInterval': 'SCHEDULER_MAX_VIEWS_INTERVAL', - 'scheduler.thumbnailsInterval': 'SCHEDULER_THUMBNAILS_INTERVAL', - 'scheduler.metricsInterval': 'SCHEDULER_METRICS_INTERVAL', + 'tasks.deleteInterval': 'TASKS_DELETE_INTERVAL', + 'tasks.clearInvitesInterval': 'TASKS_CLEAR_INVITES_INTERVAL', + 'tasks.maxViewsInterval': 'TASKS_MAX_VIEWS_INTERVAL', + 'tasks.thumbnailsInterval': 'TASKS_THUMBNAILS_INTERVAL', + 'tasks.metricsInterval': 'TASKS_METRICS_INTERVAL', 'files.route': 'FILES_ROUTE', 'files.length': 'FILES_LENGTH', @@ -264,11 +264,11 @@ export function readEnv() { env('chunks.size', 'byte'), env('chunks.enabled', 'boolean'), - env('scheduler.deleteInterval', 'ms'), - env('scheduler.clearInvitesInterval', 'ms'), - env('scheduler.maxViewsInterval', 'ms'), - env('scheduler.thumbnailsInterval', 'ms'), - env('scheduler.metricsInterval', 'ms'), + env('tasks.deleteInterval', 'ms'), + env('tasks.clearInvitesInterval', 'ms'), + env('tasks.maxViewsInterval', 'ms'), + env('tasks.thumbnailsInterval', 'ms'), + env('tasks.metricsInterval', 'ms'), env('files.route', 'string'), env('files.length', 'number'), diff --git a/src/lib/config/validate.ts b/src/lib/config/validate.ts index 8328bbeb..fc2559b8 100755 --- a/src/lib/config/validate.ts +++ b/src/lib/config/validate.ts @@ -79,7 +79,7 @@ export const schema = z.object({ size: z.number().default(bytes('25mb')), enabled: z.boolean().default(true), }), - scheduler: z.object({ + tasks: z.object({ deleteInterval: z.number().default(ms('30min')), clearInvitesInterval: z.number().default(ms('30min')), maxViewsInterval: z.number().default(ms('30min')), diff --git a/src/lib/scheduler/index.ts b/src/lib/scheduler/index.ts deleted file mode 100755 index f9444f5b..00000000 --- a/src/lib/scheduler/index.ts +++ /dev/null @@ -1,125 +0,0 @@ -import { Worker } from 'worker_threads'; -import Logger, { log } from '../logger'; - -export interface Job { - id: string; - - started: boolean; - logger: Logger; - scheduler: Scheduler; -} - -export interface WorkerJob extends Job { - path: string; - data: Data; - - worker?: Worker; -} - -export interface IntervalJob extends Job { - interval: number; - func: () => void; - - timeout?: NodeJS.Timeout; -} - -export class Scheduler { - private logger: Logger = log('scheduler'); - - public constructor(public jobs: Job[] = []) {} - - public start(): void { - this.logger.debug('starting scheduler', { - jobs: this.jobs.length, - }); - - for (const job of this.jobs) { - if (job.started) continue; - - job.logger = this.logger.c('jobs').c(job.id); - job.scheduler = this; - - if ('interval' in job) { - this.logger.debug('running first run', { - id: job.id, - }); - (job as IntervalJob).func.bind(job)(); - - this.startInterval(job as IntervalJob); - } else if ('path' in job) { - this.startWorker(job as WorkerJob); - } - } - } - - private startInterval(job: IntervalJob) { - if (job.interval === 0) { - this.logger.debug('not starting interval', { - id: job.id, - interval: job.interval, - }); - - return; - } - - job.started = true; - - const timeout = setInterval(job.func.bind(job), job.interval); - job.timeout = timeout; - - this.logger.debug('started interval job', { - id: job.id, - interval: job.interval, - }); - } - - private startWorker(job: WorkerJob) { - job.started = true; - - const worker = new Worker(job.path, { - workerData: job.data, - }); - - worker.once('exit', (code) => { - this.logger.debug('worker exited', { - id: job.id, - code, - }); - - const index = this.jobs.findIndex((x) => x.id === job.id); - if (index === -1) return; - - this.jobs.splice(index, 1); - }); - - job.worker = worker; - - this.logger.debug('started worker job', { - id: job.id, - }); - } - - public interval(id: string, interval: number, func: () => void, start: boolean = false): void { - const len = this.jobs.push({ - id, - interval, - func, - started: false, - } as IntervalJob); - - if (start) this.startInterval(this.jobs[len - 1] as IntervalJob); - } - - public worker(id: string, path: string, data: Data, start: boolean = false): WorkerJob { - const len = this.jobs.push({ - id, - path, - data, - started: false, - } as WorkerJob); - - if (start) this.startWorker(this.jobs[len - 1] as WorkerJob); - - return this.jobs[len - 1] as WorkerJob; - } -} diff --git a/src/lib/tasks/index.ts b/src/lib/tasks/index.ts new file mode 100755 index 00000000..2a78b13d --- /dev/null +++ b/src/lib/tasks/index.ts @@ -0,0 +1,127 @@ +import { Worker } from 'worker_threads'; +import Logger, { log } from '../logger'; + +export interface Task { + id: string; + + started: boolean; + logger: Logger; + + tasks: Tasks; +} + +export interface WorkerTask extends Task { + path: string; + data: Data; + + worker?: Worker; +} + +export interface IntervalTask extends Task { + interval: number; + func: () => void; + + timeout?: NodeJS.Timeout; +} + +export class Tasks { + private logger: Logger = log('tasks'); + + public constructor(public tasks: Task[] = []) {} + + public start(): void { + this.logger.debug('starting tasks', { + tasks: this.tasks.length, + }); + + for (const task of this.tasks) { + if (task.started) continue; + + task.logger = this.logger.c(task.id); + + task.tasks = this; + + if ('interval' in task) { + this.logger.debug('running first run', { + id: task.id, + }); + (task as IntervalTask).func.bind(task)(); + + this.startInterval(task as IntervalTask); + } else if ('path' in task) { + this.startWorker(task as WorkerTask); + } + } + } + + private startInterval(task: IntervalTask) { + if (task.interval === 0) { + this.logger.debug('not starting interval', { + id: task.id, + interval: task.interval, + }); + + return; + } + + task.started = true; + + const timeout = setInterval(task.func.bind(task), task.interval); + task.timeout = timeout; + + this.logger.debug('started interval task', { + id: task.id, + interval: task.interval, + }); + } + + private startWorker(task: WorkerTask) { + task.started = true; + + const worker = new Worker(task.path, { + workerData: task.data, + }); + + worker.once('exit', (code) => { + this.logger.debug('worker exited', { + id: task.id, + code, + }); + + const index = this.tasks.findIndex((x) => x.id === task.id); + if (index === -1) return; + + this.tasks.splice(index, 1); + }); + + task.worker = worker; + + this.logger.debug('started worker', { + id: task.id, + }); + } + + public interval(id: string, interval: number, func: () => void, start: boolean = false): void { + const len = this.tasks.push({ + id, + interval, + func, + started: false, + } as IntervalTask); + + if (start) this.startInterval(this.tasks[len - 1] as IntervalTask); + } + + public worker(id: string, path: string, data: Data, start: boolean = false): WorkerTask { + const len = this.tasks.push({ + id, + path, + data, + started: false, + } as WorkerTask); + + if (start) this.startWorker(this.tasks[len - 1] as WorkerTask); + + return this.tasks[len - 1] as WorkerTask; + } +} diff --git a/src/lib/scheduler/jobs/clearInvites.ts b/src/lib/tasks/run/clearInvites.ts similarity index 93% rename from src/lib/scheduler/jobs/clearInvites.ts rename to src/lib/tasks/run/clearInvites.ts index fb996b5b..7daaa990 100755 --- a/src/lib/scheduler/jobs/clearInvites.ts +++ b/src/lib/tasks/run/clearInvites.ts @@ -1,7 +1,7 @@ -import { IntervalJob } from '..'; +import { IntervalTask } from '..'; export default function clearInvites(prisma: typeof globalThis.__db__) { - return async function (this: IntervalJob) { + return async function (this: IntervalTask) { const expiredInvites = await prisma.invite.findMany({ where: { expiresAt: { diff --git a/src/lib/scheduler/jobs/deleteFiles.ts b/src/lib/tasks/run/deleteFiles.ts similarity index 93% rename from src/lib/scheduler/jobs/deleteFiles.ts rename to src/lib/tasks/run/deleteFiles.ts index cf93f2ef..ccd55488 100755 --- a/src/lib/scheduler/jobs/deleteFiles.ts +++ b/src/lib/tasks/run/deleteFiles.ts @@ -1,9 +1,9 @@ import { datasource } from '@/lib/datasource'; -import { IntervalJob } from '..'; +import { IntervalTask } from '..'; import { bytes } from '@/lib/bytes'; export default function deleteFiles(prisma: typeof globalThis.__db__) { - return async function (this: IntervalJob) { + return async function (this: IntervalTask) { const expiredFiles = await prisma.file.findMany({ where: { deletesAt: { diff --git a/src/lib/scheduler/jobs/maxViews.ts b/src/lib/tasks/run/maxViews.ts similarity index 95% rename from src/lib/scheduler/jobs/maxViews.ts rename to src/lib/tasks/run/maxViews.ts index 764881f4..47003472 100755 --- a/src/lib/scheduler/jobs/maxViews.ts +++ b/src/lib/tasks/run/maxViews.ts @@ -1,9 +1,9 @@ import { datasource } from '@/lib/datasource'; -import { IntervalJob } from '..'; +import { IntervalTask } from '..'; import { bytes } from '@/lib/bytes'; export default function maxViews(prisma: typeof globalThis.__db__) { - return async function (this: IntervalJob) { + return async function (this: IntervalTask) { const files = await prisma.file.findMany({ where: { views: { diff --git a/src/lib/scheduler/jobs/metrics.ts b/src/lib/tasks/run/metrics.ts similarity index 81% rename from src/lib/scheduler/jobs/metrics.ts rename to src/lib/tasks/run/metrics.ts index 1ee9a199..35ded37a 100755 --- a/src/lib/scheduler/jobs/metrics.ts +++ b/src/lib/tasks/run/metrics.ts @@ -1,8 +1,8 @@ import { queryStats } from '@/lib/stats'; -import { IntervalJob } from '..'; +import { IntervalTask } from '..'; export default function metrics(prisma: typeof globalThis.__db__) { - return async function (this: IntervalJob) { + return async function (this: IntervalTask) { const stats = await queryStats(); const metric = await prisma.metric.create({ diff --git a/src/lib/scheduler/jobs/thumbnails.ts b/src/lib/tasks/run/thumbnails.ts similarity index 85% rename from src/lib/scheduler/jobs/thumbnails.ts rename to src/lib/tasks/run/thumbnails.ts index 75793c20..9087afc2 100755 --- a/src/lib/scheduler/jobs/thumbnails.ts +++ b/src/lib/tasks/run/thumbnails.ts @@ -1,10 +1,10 @@ -import { IntervalJob, WorkerJob } from '..'; +import { IntervalTask, WorkerTask } from '..'; export default function thumbnails(prisma: typeof globalThis.__db__) { - return async function (this: IntervalJob) { - const thumbnailWorkers = this.scheduler.jobs.filter( + return async function (this: IntervalTask) { + const thumbnailWorkers = this.tasks.tasks.filter( (x) => 'worker' in x && x.id.startsWith('thumbnail'), - ) as unknown as WorkerJob[]; + ) as unknown as WorkerTask[]; if (!thumbnailWorkers.length) return; diff --git a/src/offload/partial.ts b/src/offload/partial.ts index 8ef6bec8..2a176fd0 100755 --- a/src/offload/partial.ts +++ b/src/offload/partial.ts @@ -25,7 +25,7 @@ export type PartialWorkerData = { }; const { user, file, options, responseUrl, domain } = workerData as PartialWorkerData; -const logger = log('scheduler').c('jobs').c('partial').c(file.filename); +const logger = log('tasks').c('partial').c(file.filename); worker(); diff --git a/src/offload/thumbnails.ts b/src/offload/thumbnails.ts index 2ae4024b..b623104c 100755 --- a/src/offload/thumbnails.ts +++ b/src/offload/thumbnails.ts @@ -15,7 +15,7 @@ export type ThumbnailWorkerData = { const { id, enabled } = workerData as ThumbnailWorkerData; -const logger = log('scheduler').c('jobs').c(id); +const logger = log('tasks').c(id); if (isMainThread) { logger.error("thumbnail worker can't run on the main thread"); diff --git a/src/server/index.ts b/src/server/index.ts index afbb721c..48c09d7e 100755 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -3,12 +3,12 @@ import { validateEnv } from '@/lib/config/validate'; import { prisma } from '@/lib/db'; import { runMigrations } from '@/lib/db/migration'; import { log } from '@/lib/logger'; -import { Scheduler } from '@/lib/scheduler'; -import clearInvites from '@/lib/scheduler/jobs/clearInvites'; -import deleteFiles from '@/lib/scheduler/jobs/deleteFiles'; -import maxViews from '@/lib/scheduler/jobs/maxViews'; -import metrics from '@/lib/scheduler/jobs/metrics'; -import thumbnails from '@/lib/scheduler/jobs/thumbnails'; +import { Tasks } from '@/lib/tasks'; +import clearInvites from '@/lib/tasks/run/clearInvites'; +import deleteFiles from '@/lib/tasks/run/deleteFiles'; +import maxViews from '@/lib/tasks/run/maxViews'; +import metrics from '@/lib/tasks/run/metrics'; +import thumbnails from '@/lib/tasks/run/thumbnails'; import { fastifyCookie } from '@fastify/cookie'; import { fastifyCors } from '@fastify/cors'; import { fastifyMultipart } from '@fastify/multipart'; @@ -160,28 +160,25 @@ async function main() { logger.info('server started', { hostname: config.core.hostname, port: config.core.port }); - const scheduler = new Scheduler(); - scheduler.interval('deletefiles', config.scheduler.deleteInterval, deleteFiles(prisma)); - scheduler.interval('maxviews', config.scheduler.maxViewsInterval, maxViews(prisma)); + const tasks = new Tasks(); + tasks.interval('deletefiles', config.tasks.deleteInterval, deleteFiles(prisma)); + tasks.interval('maxviews', config.tasks.maxViewsInterval, maxViews(prisma)); - if (config.features.metrics) - scheduler.interval('metrics', config.scheduler.metricsInterval, metrics(prisma)); + if (config.features.metrics) tasks.interval('metrics', config.tasks.metricsInterval, metrics(prisma)); if (config.features.thumbnails.enabled) { for (let i = 0; i !== config.features.thumbnails.num_threads; ++i) { - scheduler.worker(`thumbnail-${i}`, './build/offload/thumbnails.js', { + tasks.worker(`thumbnail-${i}`, './build/offload/thumbnails.js', { id: `thumbnail-${i}`, enabled: config.features.thumbnails.enabled, }); } - scheduler.interval('thumbnails', config.scheduler.thumbnailsInterval, thumbnails(prisma)); - scheduler.interval('clearinvites', config.scheduler.clearInvitesInterval, clearInvites(prisma)); + tasks.interval('thumbnails', config.tasks.thumbnailsInterval, thumbnails(prisma)); + tasks.interval('clearinvites', config.tasks.clearInvitesInterval, clearInvites(prisma)); } - logger.info('starting scheduler'); - - scheduler.start(); + tasks.start(); } main();