147 lines
4.3 KiB
TypeScript
147 lines
4.3 KiB
TypeScript
import { Job } from "bullmq";
|
|
import { getPocketBaseClient } from "../util/pocketbase";
|
|
import env from "../../.config/env";
|
|
import { join, extname, dirname } from "node:path";
|
|
import spawn from "nano-spawn";
|
|
import { stat, mkdir, utimes } from "node:fs/promises";
|
|
import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue";
|
|
import { formatDate } from "date-fns";
|
|
|
|
interface Payload {
|
|
vodId: string;
|
|
}
|
|
|
|
interface FileInfo {
|
|
sha1: string | 'none'; // annoying 'none' has to be dealt with
|
|
size: number;
|
|
contentType: string;
|
|
fileId: string;
|
|
}
|
|
|
|
const cacheRoot = join(env?.CACHE_ROOT, 'worker');
|
|
|
|
function assertPayload(payload: any): asserts payload is Payload {
|
|
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
|
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
|
}
|
|
|
|
|
|
async function fileExists(path: string) {
|
|
try {
|
|
return (await stat(path)).isFile();
|
|
} catch {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Fetches metadata for a B2 object using the CLI.
|
|
* Uses: `b2 file info b2://<key>`
|
|
*
|
|
* Returns:
|
|
* - sha1 checksum
|
|
* - file size
|
|
* - content type
|
|
* - file ID
|
|
*
|
|
* GOTCHA: Sometimes there is no checksum available via B2. This is because of how the files were uploaded (multipart.)
|
|
*
|
|
* Throws on any CLI error or missing fields.
|
|
*/
|
|
export async function getB2FileInfo(job: Job, s3Key: string): Promise<FileInfo> {
|
|
const cmd = "b2";
|
|
const args = ["file", "info", `b2://${env.AWS_BUCKET}/${s3Key}`];
|
|
|
|
let stdout: string;
|
|
await job.log(`Running ${cmd} ${args.join(' ')}`);
|
|
|
|
try {
|
|
const result = await spawn(cmd, args);
|
|
stdout = result.stdout;
|
|
} catch (err: any) {
|
|
throw new Error(`Failed to run 'b2 file info': stderr:${err.stderr} message:${err.message}`);
|
|
}
|
|
|
|
let data: any;
|
|
try {
|
|
data = JSON.parse(stdout);
|
|
|
|
} catch (err) {
|
|
throw new Error(`Failed to parse JSON from B2 CLI: ${stdout}`);
|
|
}
|
|
|
|
if (!data.contentSha1 || !data.size) {
|
|
throw new Error(`Unexpected B2 file info format for key ${s3Key}. contentSha1 or size was missing.`);
|
|
}
|
|
|
|
|
|
return {
|
|
sha1: data.contentSha1,
|
|
size: data.size,
|
|
contentType: data.contentType || "application/octet-stream",
|
|
fileId: data.fileId
|
|
};
|
|
}
|
|
|
|
|
|
/**
|
|
*
|
|
* getFileFromCache
|
|
*
|
|
* Get a sourceVideo file from cache. If not available in the cache, the file is downloaded and cached.
|
|
*
|
|
* Remember to make processors
|
|
* * idempotent
|
|
* * fail fast
|
|
* * DRY
|
|
*/
|
|
export default async function cacheGet(job: Job): Promise<{ cachePath: string, vodId: string, sourceVideo: string }> {
|
|
|
|
assertPayload(job.data);
|
|
const pb = await getPocketBaseClient();
|
|
const vodId = job.data.vodId;
|
|
const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers', requestKey: `cacheGet-${vodId}` });
|
|
|
|
const sourceVideo = vod.sourceVideo;
|
|
if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`);
|
|
|
|
// 0. get filesize from B2. (They don't reliably offer a checksum so size is a compromise.)
|
|
const info = await getB2FileInfo(job, vod.sourceVideo);
|
|
|
|
|
|
// 1. Determine local path. We don't have reliable access to sha1, so we use the file size.
|
|
// This cachePath is THE cache key, so it should remain stable.
|
|
const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd');
|
|
const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-');
|
|
const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, '' + info.size, `${formattedDate}-${vtubers}-${vod.id}${extname(vod.sourceVideo)}`);
|
|
// 1.5 ensure cache dir
|
|
await mkdir(dirname(cachePath), { recursive: true });
|
|
|
|
|
|
// 2. check if cached
|
|
if ((await fileExists(cachePath))) {
|
|
await job.log(`Cache HIT~. ${cachePath}`);
|
|
return { cachePath, vodId, sourceVideo };
|
|
}
|
|
|
|
// 3. queue deterministic download job
|
|
await job.log(`Cache MISS~. Downloading vod ${vodId} to ${cachePath}...`);
|
|
const downloadJob = await downloadQueue.add(
|
|
'download',
|
|
{ vodId, cachePath },
|
|
{ jobId: `download-${vodId}` }
|
|
);
|
|
|
|
|
|
// 4. wait for download to finish
|
|
await downloadJob.waitUntilFinished(downloadQueueEvents, 1000 * 60 * 60 * 3);
|
|
|
|
// 4.5 set access times (used for cache cleanup)
|
|
await utimes(cachePath, Date.now(), Date.now());
|
|
|
|
await job.log(`cacheGet complete with file downloaded to ${cachePath}`);
|
|
|
|
return { vodId, cachePath, sourceVideo };
|
|
}
|
|
|