From 2e39bad7189425067457661bdd025677b1ea6bf6 Mon Sep 17 00:00:00 2001 From: CJ_Clippy Date: Fri, 21 Nov 2025 21:53:13 -0800 Subject: [PATCH] createTorrent maybe works --- services/worker/src/processors/cacheGet.ts | 23 +++++++------ .../worker/src/processors/createTorrent.ts | 33 ++++++++++++------- services/worker/src/processors/download.ts | 4 ++- services/worker/src/queues/cacheQueue.ts | 6 ++-- services/worker/src/queues/downloadQueue.ts | 4 +-- services/worker/src/util/qbittorrent.ts | 11 +++++-- 6 files changed, 52 insertions(+), 29 deletions(-) diff --git a/services/worker/src/processors/cacheGet.ts b/services/worker/src/processors/cacheGet.ts index d425177f..0021fe3a 100644 --- a/services/worker/src/processors/cacheGet.ts +++ b/services/worker/src/processors/cacheGet.ts @@ -5,6 +5,7 @@ import { join, extname, dirname } from "node:path"; import spawn from "nano-spawn"; import { stat, mkdir, utimes } from "node:fs/promises"; import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue"; +import { formatDate } from "date-fns"; interface Payload { vodId: string; @@ -94,12 +95,12 @@ export async function getB2FileInfo(job: Job, s3Key: string): Promise * * fail fast * * DRY */ -export default async function cacheGet(job: Job) { +export default async function cacheGet(job: Job): Promise<{ cachePath: string, vodId: string, sourceVideo: string }> { assertPayload(job.data); const pb = await getPocketBaseClient(); const vodId = job.data.vodId; - const vod = await pb.collection('vods').getOne(vodId); + const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers', requestKey: `cacheGet-${vodId}` }); const sourceVideo = vod.sourceVideo; if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`); @@ -108,21 +109,23 @@ export default async function cacheGet(job: Job) { const info = await getB2FileInfo(job, vod.sourceVideo); - // 1. Determine local path. Use the sha1 for the cachePath if available, otherwise use the filesize. - const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, `${info.size}${extname(vod.sourceVideo)}`); - + // 1. Determine local path. We don't have reliable access to sha1, so we use the file size. + // This cachePath is THE cache key, so it should remain stable. + const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd'); + const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-'); + const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, '' + info.size, `${formattedDate}-${vtubers}-${vod.id}${extname(vod.sourceVideo)}`); // 1.5 ensure cache dir await mkdir(dirname(cachePath), { recursive: true }); // 2. check if cached if ((await fileExists(cachePath))) { - job.log(`cache HIT. ${cachePath}`); - return cachePath; + await job.log(`Cache HIT~. ${cachePath}`); + return { cachePath, vodId, sourceVideo }; } // 3. queue deterministic download job - job.log(`Cache MISS. Downloading vod ${vodId} to ${cachePath}...`); + await job.log(`Cache MISS~. Downloading vod ${vodId} to ${cachePath}...`); const downloadJob = await downloadQueue.add( 'download', { vodId, cachePath }, @@ -136,8 +139,8 @@ export default async function cacheGet(job: Job) { // 4.5 set access times (used for cache cleanup) await utimes(cachePath, Date.now(), Date.now()); - job.log(`cacheGet complete with file downloaded to ${cachePath}`); - + await job.log(`cacheGet complete with file downloaded to ${cachePath}`); + return { vodId, cachePath, sourceVideo }; } diff --git a/services/worker/src/processors/createTorrent.ts b/services/worker/src/processors/createTorrent.ts index 595f7337..a40249c1 100644 --- a/services/worker/src/processors/createTorrent.ts +++ b/services/worker/src/processors/createTorrent.ts @@ -29,7 +29,7 @@ import spawn from "nano-spawn"; import { join, basename } from 'node:path'; import { tmpdir } from "node:os"; import { nanoid } from "nanoid"; -import { formatDate } from "date-fns"; +import { cacheQueue, cacheQueueEvents } from "../queues/cacheQueue"; @@ -163,7 +163,7 @@ export async function createTorrent(job: Job) { assertPayload(payload) const { vodId } = payload const pb = await getPocketBaseClient(); - const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers' }); + const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers', requestKey: `createTorrent-${vodId}` }); // * [x] load vod @@ -182,26 +182,35 @@ export async function createTorrent(job: Job) { job.log('Creating torrent.'); // we gotta put the download in a place that qbittorrent docker container can access it - const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd'); - const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-'); - const videoFilePath = join(tmpdir(), `${formattedDate}-${vtubers}-${vod.id}.mp4`); - // * [x] download video segments from pull-thru cache - const dlFrom = `b2://${env.AWS_BUCKET}/${vod.sourceVideo}`; - job.log(`downloading ${dlFrom} to ${videoFilePath}`); - await spawn('b2', ['file', 'download', dlFrom, videoFilePath]); - const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath); + job.log(`First we need to get the vod from cache...`); + const cacheGetJob = await cacheQueue.add( + 'cacheGet', + { vodId }, + { jobId: `cache-${vodId}` } + ); - await uploadTorrentToSeedbox(job, videoFilePath, torrentFilePath); - job.log(`updating vod record`); + // 4. wait up to 3 hours for download to finish + const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3)); + await job.log(`cacheGet results: ${JSON.stringify(results)}`); + const { cachePath } = results; + + await job.log(`cachePath=${cachePath}. vodId=${vodId}. NEXT UP, create QBittorrentTorrent...`); + + const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, cachePath); + await job.log(`great! torrent created at ${torrentFilePath}. Now let's upload that torrent and the VOD to the seedbox. This will take some time...`); + await uploadTorrentToSeedbox(job, cachePath, torrentFilePath); + + job.log(`updating vod record...`); await pb.collection('vods').update(vod.id, { magnetLink }); job.log(`🏆 torrent creation complete.`); + return { magnetLink, cachePath, torrentFilePath, vodId }; } \ No newline at end of file diff --git a/services/worker/src/processors/download.ts b/services/worker/src/processors/download.ts index e1264ce0..a3e9dedb 100644 --- a/services/worker/src/processors/download.ts +++ b/services/worker/src/processors/download.ts @@ -50,6 +50,7 @@ async function monitorProgress(cachePath: string, expectedSize: number, job: Job try { const { size } = await stat(cachePath); const progress = Math.min((size / expectedSize) * 100, 100); + await job.log(`size:${size}, expectedSize:${expectedSize}, progress:${progress}`); await job.updateProgress(progress); } catch { // file might not exist yet @@ -89,7 +90,8 @@ export async function __download(job: Job, s3Key: string, cachePath: string) { export default async function download(job: Job) { assertPayload(job.data); + const vodId = job.data.vodId; const pb = await getPocketBaseClient(); - const vod = await pb.collection('vods').getOne(job.data.vodId); + const vod = await pb.collection('vods').getOne(vodId, { requestKey: `download-${vodId}` }); await __download(job, vod.sourceVideo, job.data.cachePath); } \ No newline at end of file diff --git a/services/worker/src/queues/cacheQueue.ts b/services/worker/src/queues/cacheQueue.ts index 0ff961fe..bcbe60be 100644 --- a/services/worker/src/queues/cacheQueue.ts +++ b/services/worker/src/queues/cacheQueue.ts @@ -1,7 +1,9 @@ -import { Queue } from 'bullmq'; +import { Queue, QueueEvents } from 'bullmq'; import { connection } from '../../.config/bullmq.config'; export const cacheQueue = new Queue('cacheQueue', { connection }); - +export const cacheQueueEvents = new QueueEvents("cacheQueue", { + connection +}); await cacheQueue.upsertJobScheduler( 'cache-cleanup', diff --git a/services/worker/src/queues/downloadQueue.ts b/services/worker/src/queues/downloadQueue.ts index ba13aaf5..df6b4227 100644 --- a/services/worker/src/queues/downloadQueue.ts +++ b/services/worker/src/queues/downloadQueue.ts @@ -1,6 +1,6 @@ import { Queue, QueueEvents } from 'bullmq'; import { connection } from '../../.config/bullmq.config'; export const downloadQueue = new Queue('downloadQueue', { connection }); -export const downloadQueueEvents = new QueueEvents("download", { - connection, +export const downloadQueueEvents = new QueueEvents("downloadQueue", { + connection }); \ No newline at end of file diff --git a/services/worker/src/util/qbittorrent.ts b/services/worker/src/util/qbittorrent.ts index 855492d3..661a8733 100644 --- a/services/worker/src/util/qbittorrent.ts +++ b/services/worker/src/util/qbittorrent.ts @@ -21,7 +21,7 @@ import { join, basename } from "node:path"; import { nanoid } from 'nanoid'; import semverParse from 'semver/functions/parse'; import { type SemVer } from 'semver'; - +import retry from "./retry"; interface QBittorrentClientOptions { @@ -402,7 +402,14 @@ export class QBittorrentClient { async getInfoHashV2(torrentName: string): Promise { console.log(`getInfoHashV2 using torrentName=${torrentName}`) - const torrent = await this.__getTorrentInfos(torrentName); + + // __getTorrentInfos can take some time. So we retry it every 1/2 second up to 6 times + const torrent = await retry( + () => this.__getTorrentInfos(torrentName), + 6, + 500 + ); + return torrent.infohash_v2; }