createTorrent maybe works
Some checks failed
ci / test (push) Failing after 5m36s
fp/our CI/CD / build (push) Successful in 43s

This commit is contained in:
CJ_Clippy 2025-11-21 21:53:13 -08:00
parent 72c607266f
commit 2e39bad718
6 changed files with 52 additions and 29 deletions

View File

@ -5,6 +5,7 @@ import { join, extname, dirname } from "node:path";
import spawn from "nano-spawn"; import spawn from "nano-spawn";
import { stat, mkdir, utimes } from "node:fs/promises"; import { stat, mkdir, utimes } from "node:fs/promises";
import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue"; import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue";
import { formatDate } from "date-fns";
interface Payload { interface Payload {
vodId: string; vodId: string;
@ -94,12 +95,12 @@ export async function getB2FileInfo(job: Job, s3Key: string): Promise<FileInfo>
* * fail fast * * fail fast
* * DRY * * DRY
*/ */
export default async function cacheGet(job: Job) { export default async function cacheGet(job: Job): Promise<{ cachePath: string, vodId: string, sourceVideo: string }> {
assertPayload(job.data); assertPayload(job.data);
const pb = await getPocketBaseClient(); const pb = await getPocketBaseClient();
const vodId = job.data.vodId; const vodId = job.data.vodId;
const vod = await pb.collection('vods').getOne(vodId); const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers', requestKey: `cacheGet-${vodId}` });
const sourceVideo = vod.sourceVideo; const sourceVideo = vod.sourceVideo;
if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`); if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`);
@ -108,21 +109,23 @@ export default async function cacheGet(job: Job) {
const info = await getB2FileInfo(job, vod.sourceVideo); const info = await getB2FileInfo(job, vod.sourceVideo);
// 1. Determine local path. Use the sha1 for the cachePath if available, otherwise use the filesize. // 1. Determine local path. We don't have reliable access to sha1, so we use the file size.
const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, `${info.size}${extname(vod.sourceVideo)}`); // This cachePath is THE cache key, so it should remain stable.
const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd');
const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-');
const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, '' + info.size, `${formattedDate}-${vtubers}-${vod.id}${extname(vod.sourceVideo)}`);
// 1.5 ensure cache dir // 1.5 ensure cache dir
await mkdir(dirname(cachePath), { recursive: true }); await mkdir(dirname(cachePath), { recursive: true });
// 2. check if cached // 2. check if cached
if ((await fileExists(cachePath))) { if ((await fileExists(cachePath))) {
job.log(`cache HIT. ${cachePath}`); await job.log(`Cache HIT~. ${cachePath}`);
return cachePath; return { cachePath, vodId, sourceVideo };
} }
// 3. queue deterministic download job // 3. queue deterministic download job
job.log(`Cache MISS. Downloading vod ${vodId} to ${cachePath}...`); await job.log(`Cache MISS~. Downloading vod ${vodId} to ${cachePath}...`);
const downloadJob = await downloadQueue.add( const downloadJob = await downloadQueue.add(
'download', 'download',
{ vodId, cachePath }, { vodId, cachePath },
@ -136,8 +139,8 @@ export default async function cacheGet(job: Job) {
// 4.5 set access times (used for cache cleanup) // 4.5 set access times (used for cache cleanup)
await utimes(cachePath, Date.now(), Date.now()); await utimes(cachePath, Date.now(), Date.now());
job.log(`cacheGet complete with file downloaded to ${cachePath}`); await job.log(`cacheGet complete with file downloaded to ${cachePath}`);
return { vodId, cachePath, sourceVideo };
} }

View File

@ -29,7 +29,7 @@ import spawn from "nano-spawn";
import { join, basename } from 'node:path'; import { join, basename } from 'node:path';
import { tmpdir } from "node:os"; import { tmpdir } from "node:os";
import { nanoid } from "nanoid"; import { nanoid } from "nanoid";
import { formatDate } from "date-fns"; import { cacheQueue, cacheQueueEvents } from "../queues/cacheQueue";
@ -163,7 +163,7 @@ export async function createTorrent(job: Job) {
assertPayload(payload) assertPayload(payload)
const { vodId } = payload const { vodId } = payload
const pb = await getPocketBaseClient(); const pb = await getPocketBaseClient();
const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers' }); const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers', requestKey: `createTorrent-${vodId}` });
// * [x] load vod // * [x] load vod
@ -182,26 +182,35 @@ export async function createTorrent(job: Job) {
job.log('Creating torrent.'); job.log('Creating torrent.');
// we gotta put the download in a place that qbittorrent docker container can access it // we gotta put the download in a place that qbittorrent docker container can access it
const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd');
const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-');
const videoFilePath = join(tmpdir(), `${formattedDate}-${vtubers}-${vod.id}.mp4`);
// * [x] download video segments from pull-thru cache
const dlFrom = `b2://${env.AWS_BUCKET}/${vod.sourceVideo}`;
job.log(`downloading ${dlFrom} to ${videoFilePath}`);
await spawn('b2', ['file', 'download', dlFrom, videoFilePath]);
const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath); job.log(`First we need to get the vod from cache...`);
const cacheGetJob = await cacheQueue.add(
'cacheGet',
{ vodId },
{ jobId: `cache-${vodId}` }
);
await uploadTorrentToSeedbox(job, videoFilePath, torrentFilePath);
job.log(`updating vod record`); // 4. wait up to 3 hours for download to finish
const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3));
await job.log(`cacheGet results: ${JSON.stringify(results)}`);
const { cachePath } = results;
await job.log(`cachePath=${cachePath}. vodId=${vodId}. NEXT UP, create QBittorrentTorrent...`);
const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, cachePath);
await job.log(`great! torrent created at ${torrentFilePath}. Now let's upload that torrent and the VOD to the seedbox. This will take some time...`);
await uploadTorrentToSeedbox(job, cachePath, torrentFilePath);
job.log(`updating vod record...`);
await pb.collection('vods').update(vod.id, { await pb.collection('vods').update(vod.id, {
magnetLink magnetLink
}); });
job.log(`🏆 torrent creation complete.`); job.log(`🏆 torrent creation complete.`);
return { magnetLink, cachePath, torrentFilePath, vodId };
} }

View File

@ -50,6 +50,7 @@ async function monitorProgress(cachePath: string, expectedSize: number, job: Job
try { try {
const { size } = await stat(cachePath); const { size } = await stat(cachePath);
const progress = Math.min((size / expectedSize) * 100, 100); const progress = Math.min((size / expectedSize) * 100, 100);
await job.log(`size:${size}, expectedSize:${expectedSize}, progress:${progress}`);
await job.updateProgress(progress); await job.updateProgress(progress);
} catch { } catch {
// file might not exist yet // file might not exist yet
@ -89,7 +90,8 @@ export async function __download(job: Job, s3Key: string, cachePath: string) {
export default async function download(job: Job) { export default async function download(job: Job) {
assertPayload(job.data); assertPayload(job.data);
const vodId = job.data.vodId;
const pb = await getPocketBaseClient(); const pb = await getPocketBaseClient();
const vod = await pb.collection('vods').getOne(job.data.vodId); const vod = await pb.collection('vods').getOne(vodId, { requestKey: `download-${vodId}` });
await __download(job, vod.sourceVideo, job.data.cachePath); await __download(job, vod.sourceVideo, job.data.cachePath);
} }

View File

@ -1,7 +1,9 @@
import { Queue } from 'bullmq'; import { Queue, QueueEvents } from 'bullmq';
import { connection } from '../../.config/bullmq.config'; import { connection } from '../../.config/bullmq.config';
export const cacheQueue = new Queue('cacheQueue', { connection }); export const cacheQueue = new Queue('cacheQueue', { connection });
export const cacheQueueEvents = new QueueEvents("cacheQueue", {
connection
});
await cacheQueue.upsertJobScheduler( await cacheQueue.upsertJobScheduler(
'cache-cleanup', 'cache-cleanup',

View File

@ -1,6 +1,6 @@
import { Queue, QueueEvents } from 'bullmq'; import { Queue, QueueEvents } from 'bullmq';
import { connection } from '../../.config/bullmq.config'; import { connection } from '../../.config/bullmq.config';
export const downloadQueue = new Queue('downloadQueue', { connection }); export const downloadQueue = new Queue('downloadQueue', { connection });
export const downloadQueueEvents = new QueueEvents("download", { export const downloadQueueEvents = new QueueEvents("downloadQueue", {
connection, connection
}); });

View File

@ -21,7 +21,7 @@ import { join, basename } from "node:path";
import { nanoid } from 'nanoid'; import { nanoid } from 'nanoid';
import semverParse from 'semver/functions/parse'; import semverParse from 'semver/functions/parse';
import { type SemVer } from 'semver'; import { type SemVer } from 'semver';
import retry from "./retry";
interface QBittorrentClientOptions { interface QBittorrentClientOptions {
@ -402,7 +402,14 @@ export class QBittorrentClient {
async getInfoHashV2(torrentName: string): Promise<string> { async getInfoHashV2(torrentName: string): Promise<string> {
console.log(`getInfoHashV2 using torrentName=${torrentName}`) console.log(`getInfoHashV2 using torrentName=${torrentName}`)
const torrent = await this.__getTorrentInfos(torrentName);
// __getTorrentInfos can take some time. So we retry it every 1/2 second up to 6 times
const torrent = await retry(
() => this.__getTorrentInfos(torrentName),
6,
500
);
return torrent.infohash_v2; return torrent.infohash_v2;
} }