mirror of
https://github.com/diced/zipline.git
synced 2025-12-12 07:40:45 -08:00
refactor: scheduler -> tasks
This commit is contained in:
@@ -104,7 +104,7 @@ FILES_REMOVE_GPS_METADATA=false
|
||||
DATASOURCE_TYPE='local'
|
||||
DATASOURCE_LOCAL_DIRECTORY='./uploads'
|
||||
|
||||
SCHEDULER_METRICS_INTERVAL=1min
|
||||
TASKS_METRICS_INTERVAL=1min
|
||||
|
||||
FILES_MAX_FILE_SIZE=100mb
|
||||
|
||||
|
||||
@@ -20,7 +20,7 @@ export const rawConfig: any = {
|
||||
size: undefined,
|
||||
enabled: undefined,
|
||||
},
|
||||
scheduler: {
|
||||
tasks: {
|
||||
deleteInterval: undefined,
|
||||
clearInvitesInterval: undefined,
|
||||
maxViewsInterval: undefined,
|
||||
@@ -136,11 +136,11 @@ export const PROP_TO_ENV = {
|
||||
'chunks.size': 'CHUNKS_SIZE',
|
||||
'chunks.enabled': 'CHUNKS_ENABLED',
|
||||
|
||||
'scheduler.deleteInterval': 'SCHEDULER_DELETE_INTERVAL',
|
||||
'scheduler.clearInvitesInterval': 'SCHEDULER_CLEAR_INVITES_INTERVAL',
|
||||
'scheduler.maxViewsInterval': 'SCHEDULER_MAX_VIEWS_INTERVAL',
|
||||
'scheduler.thumbnailsInterval': 'SCHEDULER_THUMBNAILS_INTERVAL',
|
||||
'scheduler.metricsInterval': 'SCHEDULER_METRICS_INTERVAL',
|
||||
'tasks.deleteInterval': 'TASKS_DELETE_INTERVAL',
|
||||
'tasks.clearInvitesInterval': 'TASKS_CLEAR_INVITES_INTERVAL',
|
||||
'tasks.maxViewsInterval': 'TASKS_MAX_VIEWS_INTERVAL',
|
||||
'tasks.thumbnailsInterval': 'TASKS_THUMBNAILS_INTERVAL',
|
||||
'tasks.metricsInterval': 'TASKS_METRICS_INTERVAL',
|
||||
|
||||
'files.route': 'FILES_ROUTE',
|
||||
'files.length': 'FILES_LENGTH',
|
||||
@@ -264,11 +264,11 @@ export function readEnv() {
|
||||
env('chunks.size', 'byte'),
|
||||
env('chunks.enabled', 'boolean'),
|
||||
|
||||
env('scheduler.deleteInterval', 'ms'),
|
||||
env('scheduler.clearInvitesInterval', 'ms'),
|
||||
env('scheduler.maxViewsInterval', 'ms'),
|
||||
env('scheduler.thumbnailsInterval', 'ms'),
|
||||
env('scheduler.metricsInterval', 'ms'),
|
||||
env('tasks.deleteInterval', 'ms'),
|
||||
env('tasks.clearInvitesInterval', 'ms'),
|
||||
env('tasks.maxViewsInterval', 'ms'),
|
||||
env('tasks.thumbnailsInterval', 'ms'),
|
||||
env('tasks.metricsInterval', 'ms'),
|
||||
|
||||
env('files.route', 'string'),
|
||||
env('files.length', 'number'),
|
||||
|
||||
@@ -79,7 +79,7 @@ export const schema = z.object({
|
||||
size: z.number().default(bytes('25mb')),
|
||||
enabled: z.boolean().default(true),
|
||||
}),
|
||||
scheduler: z.object({
|
||||
tasks: z.object({
|
||||
deleteInterval: z.number().default(ms('30min')),
|
||||
clearInvitesInterval: z.number().default(ms('30min')),
|
||||
maxViewsInterval: z.number().default(ms('30min')),
|
||||
|
||||
@@ -1,125 +0,0 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
import Logger, { log } from '../logger';
|
||||
|
||||
export interface Job {
|
||||
id: string;
|
||||
|
||||
started: boolean;
|
||||
logger: Logger;
|
||||
scheduler: Scheduler;
|
||||
}
|
||||
|
||||
export interface WorkerJob<Data = any> extends Job {
|
||||
path: string;
|
||||
data: Data;
|
||||
|
||||
worker?: Worker;
|
||||
}
|
||||
|
||||
export interface IntervalJob extends Job {
|
||||
interval: number;
|
||||
func: () => void;
|
||||
|
||||
timeout?: NodeJS.Timeout;
|
||||
}
|
||||
|
||||
export class Scheduler {
|
||||
private logger: Logger = log('scheduler');
|
||||
|
||||
public constructor(public jobs: Job[] = []) {}
|
||||
|
||||
public start(): void {
|
||||
this.logger.debug('starting scheduler', {
|
||||
jobs: this.jobs.length,
|
||||
});
|
||||
|
||||
for (const job of this.jobs) {
|
||||
if (job.started) continue;
|
||||
|
||||
job.logger = this.logger.c('jobs').c(job.id);
|
||||
job.scheduler = this;
|
||||
|
||||
if ('interval' in job) {
|
||||
this.logger.debug('running first run', {
|
||||
id: job.id,
|
||||
});
|
||||
(job as IntervalJob).func.bind(job)();
|
||||
|
||||
this.startInterval(job as IntervalJob);
|
||||
} else if ('path' in job) {
|
||||
this.startWorker(job as WorkerJob);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private startInterval(job: IntervalJob) {
|
||||
if (job.interval === 0) {
|
||||
this.logger.debug('not starting interval', {
|
||||
id: job.id,
|
||||
interval: job.interval,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
job.started = true;
|
||||
|
||||
const timeout = setInterval(job.func.bind(job), job.interval);
|
||||
job.timeout = timeout;
|
||||
|
||||
this.logger.debug('started interval job', {
|
||||
id: job.id,
|
||||
interval: job.interval,
|
||||
});
|
||||
}
|
||||
|
||||
private startWorker(job: WorkerJob) {
|
||||
job.started = true;
|
||||
|
||||
const worker = new Worker(job.path, {
|
||||
workerData: job.data,
|
||||
});
|
||||
|
||||
worker.once('exit', (code) => {
|
||||
this.logger.debug('worker exited', {
|
||||
id: job.id,
|
||||
code,
|
||||
});
|
||||
|
||||
const index = this.jobs.findIndex((x) => x.id === job.id);
|
||||
if (index === -1) return;
|
||||
|
||||
this.jobs.splice(index, 1);
|
||||
});
|
||||
|
||||
job.worker = worker;
|
||||
|
||||
this.logger.debug('started worker job', {
|
||||
id: job.id,
|
||||
});
|
||||
}
|
||||
|
||||
public interval(id: string, interval: number, func: () => void, start: boolean = false): void {
|
||||
const len = this.jobs.push({
|
||||
id,
|
||||
interval,
|
||||
func,
|
||||
started: false,
|
||||
} as IntervalJob);
|
||||
|
||||
if (start) this.startInterval(this.jobs[len - 1] as IntervalJob);
|
||||
}
|
||||
|
||||
public worker<Data = any>(id: string, path: string, data: Data, start: boolean = false): WorkerJob<Data> {
|
||||
const len = this.jobs.push({
|
||||
id,
|
||||
path,
|
||||
data,
|
||||
started: false,
|
||||
} as WorkerJob<Data>);
|
||||
|
||||
if (start) this.startWorker(this.jobs[len - 1] as WorkerJob<Data>);
|
||||
|
||||
return this.jobs[len - 1] as WorkerJob<Data>;
|
||||
}
|
||||
}
|
||||
127
src/lib/tasks/index.ts
Executable file
127
src/lib/tasks/index.ts
Executable file
@@ -0,0 +1,127 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
import Logger, { log } from '../logger';
|
||||
|
||||
export interface Task {
|
||||
id: string;
|
||||
|
||||
started: boolean;
|
||||
logger: Logger;
|
||||
|
||||
tasks: Tasks;
|
||||
}
|
||||
|
||||
export interface WorkerTask<Data = any> extends Task {
|
||||
path: string;
|
||||
data: Data;
|
||||
|
||||
worker?: Worker;
|
||||
}
|
||||
|
||||
export interface IntervalTask extends Task {
|
||||
interval: number;
|
||||
func: () => void;
|
||||
|
||||
timeout?: NodeJS.Timeout;
|
||||
}
|
||||
|
||||
export class Tasks {
|
||||
private logger: Logger = log('tasks');
|
||||
|
||||
public constructor(public tasks: Task[] = []) {}
|
||||
|
||||
public start(): void {
|
||||
this.logger.debug('starting tasks', {
|
||||
tasks: this.tasks.length,
|
||||
});
|
||||
|
||||
for (const task of this.tasks) {
|
||||
if (task.started) continue;
|
||||
|
||||
task.logger = this.logger.c(task.id);
|
||||
|
||||
task.tasks = this;
|
||||
|
||||
if ('interval' in task) {
|
||||
this.logger.debug('running first run', {
|
||||
id: task.id,
|
||||
});
|
||||
(task as IntervalTask).func.bind(task)();
|
||||
|
||||
this.startInterval(task as IntervalTask);
|
||||
} else if ('path' in task) {
|
||||
this.startWorker(task as WorkerTask);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private startInterval(task: IntervalTask) {
|
||||
if (task.interval === 0) {
|
||||
this.logger.debug('not starting interval', {
|
||||
id: task.id,
|
||||
interval: task.interval,
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
task.started = true;
|
||||
|
||||
const timeout = setInterval(task.func.bind(task), task.interval);
|
||||
task.timeout = timeout;
|
||||
|
||||
this.logger.debug('started interval task', {
|
||||
id: task.id,
|
||||
interval: task.interval,
|
||||
});
|
||||
}
|
||||
|
||||
private startWorker(task: WorkerTask) {
|
||||
task.started = true;
|
||||
|
||||
const worker = new Worker(task.path, {
|
||||
workerData: task.data,
|
||||
});
|
||||
|
||||
worker.once('exit', (code) => {
|
||||
this.logger.debug('worker exited', {
|
||||
id: task.id,
|
||||
code,
|
||||
});
|
||||
|
||||
const index = this.tasks.findIndex((x) => x.id === task.id);
|
||||
if (index === -1) return;
|
||||
|
||||
this.tasks.splice(index, 1);
|
||||
});
|
||||
|
||||
task.worker = worker;
|
||||
|
||||
this.logger.debug('started worker', {
|
||||
id: task.id,
|
||||
});
|
||||
}
|
||||
|
||||
public interval(id: string, interval: number, func: () => void, start: boolean = false): void {
|
||||
const len = this.tasks.push({
|
||||
id,
|
||||
interval,
|
||||
func,
|
||||
started: false,
|
||||
} as IntervalTask);
|
||||
|
||||
if (start) this.startInterval(this.tasks[len - 1] as IntervalTask);
|
||||
}
|
||||
|
||||
public worker<Data = any>(id: string, path: string, data: Data, start: boolean = false): WorkerTask<Data> {
|
||||
const len = this.tasks.push({
|
||||
id,
|
||||
path,
|
||||
data,
|
||||
started: false,
|
||||
} as WorkerTask<Data>);
|
||||
|
||||
if (start) this.startWorker(this.tasks[len - 1] as WorkerTask<Data>);
|
||||
|
||||
return this.tasks[len - 1] as WorkerTask<Data>;
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
import { IntervalJob } from '..';
|
||||
import { IntervalTask } from '..';
|
||||
|
||||
export default function clearInvites(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalJob) {
|
||||
return async function (this: IntervalTask) {
|
||||
const expiredInvites = await prisma.invite.findMany({
|
||||
where: {
|
||||
expiresAt: {
|
||||
@@ -1,9 +1,9 @@
|
||||
import { datasource } from '@/lib/datasource';
|
||||
import { IntervalJob } from '..';
|
||||
import { IntervalTask } from '..';
|
||||
import { bytes } from '@/lib/bytes';
|
||||
|
||||
export default function deleteFiles(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalJob) {
|
||||
return async function (this: IntervalTask) {
|
||||
const expiredFiles = await prisma.file.findMany({
|
||||
where: {
|
||||
deletesAt: {
|
||||
@@ -1,9 +1,9 @@
|
||||
import { datasource } from '@/lib/datasource';
|
||||
import { IntervalJob } from '..';
|
||||
import { IntervalTask } from '..';
|
||||
import { bytes } from '@/lib/bytes';
|
||||
|
||||
export default function maxViews(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalJob) {
|
||||
return async function (this: IntervalTask) {
|
||||
const files = await prisma.file.findMany({
|
||||
where: {
|
||||
views: {
|
||||
@@ -1,8 +1,8 @@
|
||||
import { queryStats } from '@/lib/stats';
|
||||
import { IntervalJob } from '..';
|
||||
import { IntervalTask } from '..';
|
||||
|
||||
export default function metrics(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalJob) {
|
||||
return async function (this: IntervalTask) {
|
||||
const stats = await queryStats();
|
||||
|
||||
const metric = await prisma.metric.create({
|
||||
@@ -1,10 +1,10 @@
|
||||
import { IntervalJob, WorkerJob } from '..';
|
||||
import { IntervalTask, WorkerTask } from '..';
|
||||
|
||||
export default function thumbnails(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalJob) {
|
||||
const thumbnailWorkers = this.scheduler.jobs.filter(
|
||||
return async function (this: IntervalTask) {
|
||||
const thumbnailWorkers = this.tasks.tasks.filter(
|
||||
(x) => 'worker' in x && x.id.startsWith('thumbnail'),
|
||||
) as unknown as WorkerJob[];
|
||||
) as unknown as WorkerTask[];
|
||||
|
||||
if (!thumbnailWorkers.length) return;
|
||||
|
||||
@@ -25,7 +25,7 @@ export type PartialWorkerData = {
|
||||
};
|
||||
|
||||
const { user, file, options, responseUrl, domain } = workerData as PartialWorkerData;
|
||||
const logger = log('scheduler').c('jobs').c('partial').c(file.filename);
|
||||
const logger = log('tasks').c('partial').c(file.filename);
|
||||
|
||||
worker();
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ export type ThumbnailWorkerData = {
|
||||
|
||||
const { id, enabled } = workerData as ThumbnailWorkerData;
|
||||
|
||||
const logger = log('scheduler').c('jobs').c(id);
|
||||
const logger = log('tasks').c(id);
|
||||
|
||||
if (isMainThread) {
|
||||
logger.error("thumbnail worker can't run on the main thread");
|
||||
|
||||
@@ -3,12 +3,12 @@ import { validateEnv } from '@/lib/config/validate';
|
||||
import { prisma } from '@/lib/db';
|
||||
import { runMigrations } from '@/lib/db/migration';
|
||||
import { log } from '@/lib/logger';
|
||||
import { Scheduler } from '@/lib/scheduler';
|
||||
import clearInvites from '@/lib/scheduler/jobs/clearInvites';
|
||||
import deleteFiles from '@/lib/scheduler/jobs/deleteFiles';
|
||||
import maxViews from '@/lib/scheduler/jobs/maxViews';
|
||||
import metrics from '@/lib/scheduler/jobs/metrics';
|
||||
import thumbnails from '@/lib/scheduler/jobs/thumbnails';
|
||||
import { Tasks } from '@/lib/tasks';
|
||||
import clearInvites from '@/lib/tasks/run/clearInvites';
|
||||
import deleteFiles from '@/lib/tasks/run/deleteFiles';
|
||||
import maxViews from '@/lib/tasks/run/maxViews';
|
||||
import metrics from '@/lib/tasks/run/metrics';
|
||||
import thumbnails from '@/lib/tasks/run/thumbnails';
|
||||
import { fastifyCookie } from '@fastify/cookie';
|
||||
import { fastifyCors } from '@fastify/cors';
|
||||
import { fastifyMultipart } from '@fastify/multipart';
|
||||
@@ -160,28 +160,25 @@ async function main() {
|
||||
|
||||
logger.info('server started', { hostname: config.core.hostname, port: config.core.port });
|
||||
|
||||
const scheduler = new Scheduler();
|
||||
scheduler.interval('deletefiles', config.scheduler.deleteInterval, deleteFiles(prisma));
|
||||
scheduler.interval('maxviews', config.scheduler.maxViewsInterval, maxViews(prisma));
|
||||
const tasks = new Tasks();
|
||||
tasks.interval('deletefiles', config.tasks.deleteInterval, deleteFiles(prisma));
|
||||
tasks.interval('maxviews', config.tasks.maxViewsInterval, maxViews(prisma));
|
||||
|
||||
if (config.features.metrics)
|
||||
scheduler.interval('metrics', config.scheduler.metricsInterval, metrics(prisma));
|
||||
if (config.features.metrics) tasks.interval('metrics', config.tasks.metricsInterval, metrics(prisma));
|
||||
|
||||
if (config.features.thumbnails.enabled) {
|
||||
for (let i = 0; i !== config.features.thumbnails.num_threads; ++i) {
|
||||
scheduler.worker(`thumbnail-${i}`, './build/offload/thumbnails.js', {
|
||||
tasks.worker(`thumbnail-${i}`, './build/offload/thumbnails.js', {
|
||||
id: `thumbnail-${i}`,
|
||||
enabled: config.features.thumbnails.enabled,
|
||||
});
|
||||
}
|
||||
|
||||
scheduler.interval('thumbnails', config.scheduler.thumbnailsInterval, thumbnails(prisma));
|
||||
scheduler.interval('clearinvites', config.scheduler.clearInvitesInterval, clearInvites(prisma));
|
||||
tasks.interval('thumbnails', config.tasks.thumbnailsInterval, thumbnails(prisma));
|
||||
tasks.interval('clearinvites', config.tasks.clearInvitesInterval, clearInvites(prisma));
|
||||
}
|
||||
|
||||
logger.info('starting scheduler');
|
||||
|
||||
scheduler.start();
|
||||
tasks.start();
|
||||
}
|
||||
|
||||
main();
|
||||
|
||||
Reference in New Issue
Block a user