feat: somewhat incomplete s3 support

This commit is contained in:
diced
2024-07-13 23:47:40 -07:00
parent 7a0cfc9391
commit ea41a50a14
8 changed files with 1595 additions and 52 deletions

View File

@@ -21,6 +21,7 @@
},
"dependencies": {
"@ant-design/plots": "^1.2.6",
"@aws-sdk/client-s3": "^3.614.0",
"@fastify/cookie": "^9.3.1",
"@fastify/cors": "^9.0.1",
"@fastify/multipart": "^8.2.0",

1219
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,7 +3,7 @@ import { Readable } from 'stream';
export abstract class Datasource {
public name: string | undefined;
public abstract get(file: string): null | Readable | Promise<Readable>;
public abstract get(file: string): null | Readable | Promise<Readable | null>;
public abstract put(file: string, data: Buffer): Promise<void>;
public abstract delete(file: string): Promise<void>;
public abstract size(file: string): Promise<number>;

View File

@@ -5,7 +5,7 @@ import { Readable } from 'stream';
import { Datasource } from './Datasource';
export class LocalDatasource extends Datasource {
public name = 'local';
name = 'local';
constructor(public dir: string) {
super();

226
src/lib/datasource/S3.ts Normal file
View File

@@ -0,0 +1,226 @@
import { Readable } from 'stream';
import { Datasource } from './Datasource';
import {
DeleteObjectCommand,
DeleteObjectsCommand,
GetObjectCommand,
ListBucketsCommand,
ListObjectsCommand,
PutObjectCommand,
S3Client,
} from '@aws-sdk/client-s3';
import Logger, { log } from '../logger';
import { ReadableStream } from 'stream/web';
export class S3Datasource extends Datasource {
name = 's3';
client: S3Client;
logger: Logger = log('datasource').c('s3');
constructor(
public options: {
accessKeyId: string;
secretAccessKey: string;
region?: string;
bucket: string;
},
) {
super();
this.client = new S3Client({
credentials: {
accessKeyId: this.options.accessKeyId,
secretAccessKey: this.options.secretAccessKey,
},
region: this.options.region ?? undefined,
});
this.ensureBucketExists();
}
private async ensureBucketExists() {
try {
const res = await this.client.send(new ListBucketsCommand());
if (res.$metadata.httpStatusCode !== 200) {
this.logger
.error('there was an error while listing buckets', res.$metadata as Record<string, unknown>)
.error('zipline will now exit');
process.exit(1);
}
if (!res.Buckets?.find((bucket) => bucket.Name === this.options.bucket)) {
this.logger.error(`bucket ${this.options.bucket} does not exist`).error('zipline will now exit');
process.exit(1);
}
} catch (e) {
this.logger
.error('there was an error while listing buckets', e as Record<string, unknown>)
.error('zipline will now exit');
process.exit(1);
} finally {
this.logger.debug(`bucket ${this.options.bucket} exists`);
}
}
public async get(file: string): Promise<Readable | null> {
const command = new GetObjectCommand({
Bucket: this.options.bucket,
Key: file,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error(
'there was an error while getting object',
res.$metadata as Record<string, unknown>,
);
return null;
}
return Readable.fromWeb(res.Body!.transformToWebStream() as ReadableStream<any>);
} catch (e) {
this.logger.error('there was an error while getting object', e as Record<string, unknown>);
return null;
}
}
public async put(file: string, data: Buffer): Promise<void> {
const command = new PutObjectCommand({
Bucket: this.options.bucket,
Key: file,
Body: data,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error(
'there was an error while putting object',
res.$metadata as Record<string, unknown>,
);
}
} catch (e) {
this.logger.error('there was an error while putting object', e as Record<string, unknown>);
}
}
public async delete(file: string): Promise<void> {
const command = new DeleteObjectCommand({
Bucket: this.options.bucket,
Key: file,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error('there was an error while deleting object');
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
}
} catch (e) {
this.logger.error('there was an error while deleting object');
this.logger.error('error metadata', e as Record<string, unknown>);
}
}
public async size(file: string): Promise<number> {
const command = new GetObjectCommand({
Bucket: this.options.bucket,
Key: file,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error('there was an error while getting object');
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
return 0;
}
return Number(res.ContentLength);
} catch (e) {
this.logger.error('there was an error while getting object');
this.logger.error('error metadata', e as Record<string, unknown>);
return 0;
}
}
public async totalSize(): Promise<number> {
const command = new ListObjectsCommand({
Bucket: this.options.bucket,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error('there was an error while listing objects');
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
return 0;
}
return res.Contents?.reduce((acc, obj) => acc + Number(obj.Size), 0) ?? 0;
} catch (e) {
this.logger.error('there was an error while listing objects');
this.logger.error('error metadata', e as Record<string, unknown>);
return 0;
}
}
public async clear(): Promise<void> {
const command = new DeleteObjectsCommand({
Bucket: this.options.bucket,
Delete: {
Objects: [],
},
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 200) {
this.logger.error('there was an error while deleting objects');
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
}
} catch (e) {
this.logger.error('there was an error while deleting objects');
this.logger.error('error metadata', e as Record<string, unknown>);
}
}
public async range(file: string, start: number, end: number): Promise<Readable> {
const command = new GetObjectCommand({
Bucket: this.options.bucket,
Key: file,
Range: `bytes=${start}-${end}`,
});
try {
const res = await this.client.send(command);
if (res.$metadata.httpStatusCode !== 206) {
this.logger.error('there was an error while getting object range');
this.logger.error('error metadata', res.$metadata as Record<string, unknown>);
return Readable.fromWeb(new ReadableStream());
}
return Readable.fromWeb(res.Body!.transformToWebStream() as ReadableStream<any>);
} catch (e) {
this.logger.error('there was an error while getting object range');
this.logger.error('error metadata', e as Record<string, unknown>);
return Readable.fromWeb(new ReadableStream());
}
}
}

View File

@@ -2,6 +2,7 @@ import { config } from '../config';
import { log } from '../logger';
import { Datasource } from './Datasource';
import { LocalDatasource } from './Local';
import { S3Datasource } from './S3';
let datasource: Datasource;
@@ -18,6 +19,13 @@ if (!global.__datasource__) {
global.__datasource__ = new LocalDatasource(config.datasource.local!.directory);
break;
case 's3':
global.__datasource__ = new S3Datasource({
accessKeyId: config.datasource.s3!.accessKeyId,
secretAccessKey: config.datasource.s3!.secretAccessKey,
region: config.datasource.s3?.region,
bucket: config.datasource.s3!.bucket,
});
break;
default:
logger.error(`Datasource type ${config.datasource.type} is not supported`);
process.exit(1);

View File

@@ -27,19 +27,23 @@ export async function onUpload({ user, file, link }: Parameters<typeof discordOn
},
};
const res = await fetch(config.httpWebhook.onUpload, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
'x-zipline-webhook': 'true',
'x-zipline-webhook-type': 'upload',
},
});
try {
const res = await fetch(config.httpWebhook.onUpload, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
'x-zipline-webhook': 'true',
'x-zipline-webhook-type': 'upload',
},
});
if (!res.ok) {
const text = await res.text();
logger.error('webhook failed', { response: text, status: res.status });
if (!res.ok) {
const text = await res.text();
logger.error('webhook failed', { response: text, status: res.status });
}
} catch (e) {
logger.error('error while sending webhook', { error: (e as TypeError).message });
}
return;
@@ -67,20 +71,23 @@ export async function onShorten({ user, url, link }: Parameters<typeof discordOn
link,
},
};
try {
const res = await fetch(config.httpWebhook.onShorten, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
'x-zipline-webhook': 'true',
'x-zipline-webhook-type': 'shorten',
},
});
const res = await fetch(config.httpWebhook.onShorten, {
method: 'POST',
body: JSON.stringify(payload),
headers: {
'Content-Type': 'application/json',
'x-zipline-webhook': 'true',
'x-zipline-webhook-type': 'shorten',
},
});
if (!res.ok) {
const text = await res.text();
logger.error('webhook failed', { response: text, status: res.status });
if (!res.ok) {
const text = await res.text();
logger.error('webhook failed', { response: text, status: res.status });
}
} catch (e) {
logger.error('error while sending webhook', { error: (e as TypeError).message });
}
return;

View File

@@ -9,6 +9,14 @@ import { UploadOptions } from '@/lib/uploader/parseHeaders';
import { open, readFile, readdir, rm } from 'fs/promises';
import { join } from 'path';
import { isMainThread, workerData } from 'worker_threads';
import { createReadStream } from 'fs';
import {
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
UploadPartCommand,
} from '@aws-sdk/client-s3';
import { datasource } from '@/lib/datasource';
import { S3Datasource } from '@/lib/datasource/S3';
export type PartialWorkerData = {
user: {
@@ -73,39 +81,113 @@ async function worker() {
},
});
// todo: support s3
if (config.datasource.type === 'local') {
const fd = await open(join(config.datasource.local!.directory, file.filename), 'w');
const fd = await open(join(config.datasource.local!.directory, file.filename), 'w');
for (let i = 0; i !== readChunks.length; ++i) {
const chunk = readChunks[i];
for (let i = 0; i !== readChunks.length; ++i) {
const chunk = readChunks[i];
const buffer = await readFile(join(config.core.tempDirectory, chunk.file));
const buffer = await readFile(join(config.core.tempDirectory, chunk.file));
const { bytesWritten } = await fd.write(buffer, 0, buffer.length, chunk.start);
const { bytesWritten } = await fd.write(buffer, 0, buffer.length, chunk.start);
await rm(join(config.core.tempDirectory, chunk.file));
await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
data: {
chunksComplete: {
increment: 1,
await rm(join(config.core.tempDirectory, chunk.file));
await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
status: 'PROCESSING',
},
});
data: {
chunksComplete: {
increment: 1,
},
status: 'PROCESSING',
},
});
logger.debug(`wrote chunk ${i + 1}/${readChunks.length}`, {
bytesWritten,
start: chunk.start,
end: chunk.end,
});
logger.debug(`wrote chunk ${i + 1}/${readChunks.length}`, {
bytesWritten,
start: chunk.start,
end: chunk.end,
});
}
await fd.close();
} else if (config.datasource.type === 's3') {
const s3datasource = datasource as S3Datasource;
const { UploadId } = await s3datasource.client.send(
new CreateMultipartUploadCommand({ Bucket: s3datasource.options.bucket, Key: file.filename }),
);
const partResults = [];
for (let i = 0; i !== readChunks.length; ++i) {
const chunk = readChunks[i];
const stream = createReadStream(join(config.core.tempDirectory, chunk.file));
try {
const res = await s3datasource.client.send(
new UploadPartCommand({
Bucket: s3datasource.options.bucket,
Key: file.filename,
UploadId,
PartNumber: i + 1,
Body: stream,
ContentLength: chunk.end - chunk.start,
}),
);
logger.debug(`uploaded chunk to s3 ${i + 1}/${readChunks.length}`, {
ETag: res.ETag,
start: chunk.start,
end: chunk.end,
});
partResults.push({
ETag: res.ETag,
PartNumber: i + 1,
});
} catch (e) {
logger.error('error while uploading chunk');
console.error(e);
return;
} finally {
await rm(join(config.core.tempDirectory, chunk.file));
await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,
},
data: {
chunksComplete: {
increment: 1,
},
status: 'PROCESSING',
},
});
}
}
try {
await s3datasource.client.send(
new CompleteMultipartUploadCommand({
Bucket: s3datasource.options.bucket,
Key: file.filename,
UploadId,
MultipartUpload: {
Parts: partResults,
},
}),
);
logger.debug('completed multipart upload for s3');
} catch (e) {
logger.error('error while completing multipart upload');
console.error(e);
return;
}
}
await fd.close();
await prisma.incompleteFile.update({
where: {
id: incompleteFile.id,