mirror of
https://github.com/diced/zipline.git
synced 2025-12-12 15:50:11 -08:00
fix: better thumbnail logic
This commit is contained in:
@@ -1,14 +1,20 @@
|
||||
import { Readable } from 'stream';
|
||||
|
||||
export type DatasourceQueryOperation = {
|
||||
type: 'startsWith';
|
||||
query: string;
|
||||
};
|
||||
|
||||
export abstract class Datasource {
|
||||
public name: string | undefined;
|
||||
|
||||
public abstract get(file: string): null | Readable | Promise<Readable | null>;
|
||||
public abstract put(file: string, data: Buffer | string, options?: { mimetype?: string }): Promise<void>;
|
||||
public abstract delete(file: string): Promise<void>;
|
||||
public abstract delete(file: string | string[]): Promise<void>;
|
||||
public abstract size(file: string): Promise<number>;
|
||||
public abstract totalSize(): Promise<number>;
|
||||
public abstract clear(): Promise<void>;
|
||||
public abstract range(file: string, start: number, end: number): Promise<Readable>;
|
||||
public abstract rename(from: string, to: string): Promise<void>;
|
||||
public abstract query(operation: DatasourceQueryOperation): Promise<string[]>;
|
||||
}
|
||||
|
||||
@@ -2,7 +2,7 @@ import { createReadStream, existsSync } from 'fs';
|
||||
import { access, constants, readdir, rename, rm, stat, writeFile } from 'fs/promises';
|
||||
import { join } from 'path';
|
||||
import { Readable } from 'stream';
|
||||
import { Datasource } from './Datasource';
|
||||
import { Datasource, DatasourceQueryOperation } from './Datasource';
|
||||
|
||||
async function existsAndCanRW(path: string): Promise<boolean> {
|
||||
try {
|
||||
@@ -40,14 +40,19 @@ export class LocalDatasource extends Datasource {
|
||||
"Something went very wrong! the temporary directory wasn't readable or the file doesn't exist.",
|
||||
);
|
||||
|
||||
console.log(`Moving file from ${data} to ${path}`);
|
||||
return rename(data, path);
|
||||
}
|
||||
|
||||
return writeFile(path, data);
|
||||
}
|
||||
|
||||
public async delete(file: string): Promise<void> {
|
||||
public async delete(file: string | string[]): Promise<void> {
|
||||
if (Array.isArray(file)) {
|
||||
await Promise.all(file.map((f) => this.delete(f)));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const path = join(this.dir, file);
|
||||
if (!existsSync(path)) return Promise.resolve();
|
||||
|
||||
@@ -92,4 +97,13 @@ export class LocalDatasource extends Datasource {
|
||||
|
||||
return rename(fromPath, toPath);
|
||||
}
|
||||
|
||||
public async query(operation: DatasourceQueryOperation): Promise<string[]> {
|
||||
if (operation.type !== 'startsWith') {
|
||||
throw new Error(`Unsupported query operation type: ${operation.type}`);
|
||||
}
|
||||
|
||||
const files = await readdir(this.dir);
|
||||
return files.filter((file) => file.startsWith(operation.query));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ import { Readable } from 'stream';
|
||||
import { ReadableStream } from 'stream/web';
|
||||
import Logger, { log } from '../logger';
|
||||
import { randomCharacters } from '../random';
|
||||
import { Datasource } from './Datasource';
|
||||
import { Datasource, DatasourceQueryOperation } from './Datasource';
|
||||
|
||||
function isOk(code: number) {
|
||||
return code >= 200 && code < 300;
|
||||
@@ -200,14 +200,25 @@ export class S3Datasource extends Datasource {
|
||||
}
|
||||
}
|
||||
|
||||
public async delete(file: string): Promise<void> {
|
||||
const command = new DeleteObjectCommand({
|
||||
Bucket: this.options.bucket,
|
||||
Key: this.key(file),
|
||||
});
|
||||
public async delete(file: string | string[]): Promise<void> {
|
||||
let command: DeleteObjectCommand | DeleteObjectsCommand;
|
||||
|
||||
if (Array.isArray(file)) {
|
||||
command = new DeleteObjectsCommand({
|
||||
Bucket: this.options.bucket,
|
||||
Delete: {
|
||||
Objects: file.map((f) => ({ Key: this.key(f) })),
|
||||
},
|
||||
});
|
||||
} else {
|
||||
command = new DeleteObjectCommand({
|
||||
Bucket: this.options.bucket,
|
||||
Key: this.key(file),
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
const res = await this.client.send(command);
|
||||
const res = await this.client.send(command as never);
|
||||
|
||||
if (!isOk(res.$metadata.httpStatusCode || 0)) {
|
||||
this.logger.error('there was an error while deleting object');
|
||||
@@ -350,4 +361,34 @@ export class S3Datasource extends Datasource {
|
||||
throw new Error('Failed to rename object');
|
||||
}
|
||||
}
|
||||
|
||||
public async query(operation: DatasourceQueryOperation): Promise<string[]> {
|
||||
if (operation.type !== 'startsWith') {
|
||||
throw new Error(`Unsupported query operation type: ${operation.type}`);
|
||||
}
|
||||
|
||||
const command = new ListObjectsCommand({
|
||||
Bucket: this.options.bucket,
|
||||
Prefix: this.key(operation.query),
|
||||
Delimiter: this.options.subdirectory ? undefined : '/',
|
||||
});
|
||||
|
||||
try {
|
||||
const res = await this.client.send(command);
|
||||
|
||||
if (!isOk(res.$metadata.httpStatusCode || 0)) {
|
||||
this.logger.error('there was an error while listing objects');
|
||||
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
|
||||
|
||||
return [];
|
||||
}
|
||||
|
||||
return res.Contents?.map((obj) => obj.Key!.replace(this.key(''), '')) || [];
|
||||
} catch (e) {
|
||||
this.logger.error('there was an error while listing objects');
|
||||
this.logger.error('error metadata', e as Record<string, unknown>);
|
||||
|
||||
return [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { bytes } from '@/lib/bytes';
|
||||
import { datasource } from '@/lib/datasource';
|
||||
import { IntervalTask } from '..';
|
||||
import { bytes } from '@/lib/bytes';
|
||||
|
||||
export default function deleteFiles(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalTask) {
|
||||
|
||||
@@ -1,7 +1,19 @@
|
||||
import { datasource } from '@/lib/datasource';
|
||||
import { IntervalTask, WorkerTask } from '..';
|
||||
|
||||
export default function thumbnails(prisma: typeof globalThis.__db__) {
|
||||
return async function (this: IntervalTask, rerun = false) {
|
||||
// delete orphaned thumbnail files
|
||||
this.logger.debug('preparing to delete orphaned thumbnail files');
|
||||
|
||||
const files = await datasource.query({ type: 'startsWith', query: '.thumbnail.' });
|
||||
this.logger.debug(`found ${files.length} orphaned thumbnail files`);
|
||||
|
||||
if (files.length) {
|
||||
await datasource.delete(files);
|
||||
}
|
||||
|
||||
// run thumbnails
|
||||
const thumbnailWorkers = this.tasks.tasks.filter(
|
||||
(x) => 'worker' in x && x.id.startsWith('thumbnail'),
|
||||
) as unknown as WorkerTask[];
|
||||
|
||||
@@ -219,11 +219,14 @@ async function main() {
|
||||
// Tasks
|
||||
tasks.interval('deletefiles', ms(config.tasks.deleteInterval as StringValue), deleteFiles(prisma));
|
||||
tasks.interval('maxviews', ms(config.tasks.maxViewsInterval as StringValue), maxViews(prisma));
|
||||
tasks.interval('clearinvites', ms(config.tasks.clearInvitesInterval as StringValue), clearInvites(prisma));
|
||||
|
||||
if (config.features.metrics)
|
||||
tasks.interval('metrics', ms(config.tasks.metricsInterval as StringValue), metrics(prisma));
|
||||
|
||||
if (config.features.thumbnails.enabled) {
|
||||
tasks.interval('thumbnails', ms(config.tasks.thumbnailsInterval as StringValue), thumbnails(prisma));
|
||||
|
||||
for (let i = 0; i !== config.features.thumbnails.num_threads; ++i) {
|
||||
tasks.worker(
|
||||
`thumbnail-${i}`,
|
||||
@@ -263,13 +266,6 @@ async function main() {
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
tasks.interval('thumbnails', ms(config.tasks.thumbnailsInterval as StringValue), thumbnails(prisma));
|
||||
tasks.interval(
|
||||
'clearinvites',
|
||||
ms(config.tasks.clearInvitesInterval as StringValue),
|
||||
clearInvites(prisma),
|
||||
);
|
||||
}
|
||||
|
||||
tasks.start();
|
||||
|
||||
Reference in New Issue
Block a user