diff --git a/services/pocketbase/pb_hooks/pages/_private/vod-list.ejs b/services/pocketbase/pb_hooks/pages/_private/vod-list.ejs
index e13088ff..2fdfdd88 100644
--- a/services/pocketbase/pb_hooks/pages/_private/vod-list.ejs
+++ b/services/pocketbase/pb_hooks/pages/_private/vod-list.ejs
@@ -6,7 +6,8 @@
| Stream Date |
VTuber |
Thumbnail |
- DL |
+ Torrent |
+ Magnet Link |
@@ -38,6 +39,18 @@
No thumbnail
<% } %>
+
+ <% if (vod?.torrent) { %>
+
+
+
+
+
+
+ <% } %>
+ |
<% if (vod?.magnetLink) { %>
diff --git a/services/pocketbase/utils/sign-url.js b/services/pocketbase/utils/sign-url.js
index e574a7d2..180ff74e 100644
--- a/services/pocketbase/utils/sign-url.js
+++ b/services/pocketbase/utils/sign-url.js
@@ -62,12 +62,5 @@ function signUrl(url, securityKey, expirationTime = 103600, userIp, isDirectory
}
}
-const securityKey = 'd5814175-cc56-4098-ae63-1096301fb3c1';
-const sampleUrl = 'https://fppbdev.b-cdn.net/pbc_3872109612/z0bpy5cwxi1uksv/g4ot5_omb_qaei_o7_y_tuzdlcn1se.jpeg';
-// const expires = Math.round(Date.now() / 1000) + 3600;
-const expires = 3524904301;
-const signedUrl = signUrl(sampleUrl, securityKey, expires, null, false, '');
-
-console.log(`signedUrl=${signedUrl}`);
module.exports = { signUrl };
\ No newline at end of file
diff --git a/services/worker/README.md b/services/worker/README.md
index dfb7aede..73635f47 100644
--- a/services/worker/README.md
+++ b/services/worker/README.md
@@ -1,15 +1,27 @@
# worker
-To install dependencies:
+To install node dependencies:
```bash
-bun install
+npm install
```
To run:
```bash
-bun run index.ts
+npm run start
```
-This project was created using `bun init` in bun v1.3.1. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime.
+
+
+## External Dependencies
+
+Worker needs these packages installed in order to be successful. Please make sure these are installed in the environment.
+
+* vcsi
+* whisper-cli
+* yolo
+* ffmpeg
+* qbittorrent-nox
+* b2cli
+* valkey
\ No newline at end of file
diff --git a/services/worker/src/index.ts b/services/worker/src/index.ts
index c54d9cce..64fb1cdc 100644
--- a/services/worker/src/index.ts
+++ b/services/worker/src/index.ts
@@ -1,7 +1,6 @@
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
-import { type JobsOptions } from 'bullmq';
import express, { type Request, type Response } from 'express';
import { generalQueue } from './queues/generalQueue.ts';
import { gpuQueue } from './queues/gpuQueue.ts';
@@ -11,6 +10,7 @@ import { version } from '../package.json';
import { downloadQueue } from './queues/downloadQueue.ts';
import { cacheQueue } from './queues/cacheQueue.ts';
import { muxQueue } from './queues/muxQueue.ts';
+import { b2Queue } from './queues/b2Queue.ts';
const run = async () => {
@@ -30,6 +30,7 @@ const run = async () => {
new BullMQAdapter(downloadQueue),
new BullMQAdapter(cacheQueue),
new BullMQAdapter(muxQueue),
+ new BullMQAdapter(b2Queue),
],
serverAdapter,
});
diff --git a/services/worker/src/processors/cacheCleanup.ts b/services/worker/src/processors/cacheCleanup.ts
index 020714d1..6c690d20 100644
--- a/services/worker/src/processors/cacheCleanup.ts
+++ b/services/worker/src/processors/cacheCleanup.ts
@@ -3,7 +3,7 @@ import fs from "node:fs/promises";
import { join } from "node:path";
import env from "../../.config/env";
-const retainmentDayCount = 90;
+const retainmentDayCount = 2;
/**
* cacheCleanup
diff --git a/services/worker/src/processors/createTorrent.ts b/services/worker/src/processors/createTorrent.ts
index fc595ad9..529493ad 100644
--- a/services/worker/src/processors/createTorrent.ts
+++ b/services/worker/src/processors/createTorrent.ts
@@ -20,16 +20,13 @@
-import env from "../../.config/env";
import { sshClient } from "../util/sftp";
import { qbtClient, QBTorrentInfo } from "../util/qbittorrent";
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
-import spawn from "nano-spawn";
-import { join, basename } from 'node:path';
-import { tmpdir } from "node:os";
-import { nanoid } from "nanoid";
+import { basename } from 'node:path';
import { cacheQueue, cacheQueueEvents } from "../queues/cacheQueue";
+import { readFile } from "node:fs/promises";
@@ -211,12 +208,12 @@ export async function createTorrent(job: Job) {
const cacheGetJob = await cacheQueue.add(
'cacheGet',
{ vodId },
- { jobId: `cache-${vodId}` }
+ { jobId: `cacheGet-${vodId}` }
);
- // 4. wait up to 3 hours for download to finish
- const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3));
+ // 4. wait up to 5 hours for download to finish
+ const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 5));
await job.log(`cacheGet results: ${JSON.stringify(results)}`);
const { cachePath } = results;
@@ -230,10 +227,18 @@ export async function createTorrent(job: Job) {
await job.log(`great! torrent created at ${torrentFilePath}. Now let's upload that torrent and the VOD to the seedbox. This will take some time...`);
await uploadTorrentToSeedbox(job, cachePath, torrentFilePath);
- job.log(`updating vod record...`);
- await pb.collection('vods').update(vod.id, {
- magnetLink
- });
+ job.log(`Updating vod record in the db. This involes sending a multipart/form to pocketbase with the .torrent file as a buffer and the magnetLink as text...`);
+ const formData = new FormData();
+
+ const torrentBuffer = await readFile(torrentFilePath)
+ formData.append('torrent', new Blob([torrentBuffer]), basename(torrentFilePath));
+ formData.append('magnetLink', magnetLink);
+
+ await pb.collection('vods').update(vod.id, formData);
+
+
+
+
job.log(`Torrent creation complete.`);
await job.updateProgress(100);
diff --git a/services/worker/src/processors/createVideoThumbnail.ts b/services/worker/src/processors/createVideoThumbnail.ts
index 7d8823ad..39196811 100644
--- a/services/worker/src/processors/createVideoThumbnail.ts
+++ b/services/worker/src/processors/createVideoThumbnail.ts
@@ -1,33 +1,73 @@
-import type { Helpers } from "graphile-worker";
-import { PrismaClient } from "../../generated/prisma";
-import { withAccelerate } from "@prisma/extension-accelerate";
-import { getOrDownloadAsset } from "../utils/cache";
-import { env } from "../config/env";
-import { getS3Client, uploadFile } from "../utils/s3";
-import { nanoid } from "nanoid";
-import { getNanoSpawn } from "../utils/nanoSpawn";
-import { generateS3Path } from "../utils/formatters";
-import logger from "../utils/logger";
-import { preparePython } from "../utils/python";
-
-const prisma = new PrismaClient().$extends(withAccelerate());
-
+import env from "../../.config/env";
+import { Job } from "bullmq";
+import { getPocketBaseClient } from "../util/pocketbase";
+import spawn from 'nano-spawn';
+import { readFile } from "fs/promises";
+import { basename } from "node:path";
+import { cacheQueue, cacheQueueEvents } from '../queues/cacheQueue';
interface Payload {
vodId: string;
}
-async function createThumbnail(helpers: Helpers, inputFilePath: string) {
- logger.debug(`createThumbnail with inputFilePath=${inputFilePath}`)
+
+function assertPayload(payload: any): asserts payload is Payload {
+ if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
+ if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
+}
+
+/**
+ *
+ * createVideoThumbnail
+ *
+ * uses vcsi to create a 5x5 grid of video frames
+ */
+export async function createVideoThumbnail(job: Job) {
+
+ assertPayload(job.data);
+ const vodId = job.data.vodId;
+ const pb = await getPocketBaseClient();
+ const vod = await pb.collection('vods').getOne(vodId);
+
+ job.log(`createVideoThumbnail for ${vodId} starting.`);
+
+ job.log(`pulling sourceVideo from cache...`);
+
+ const cacheJob = await cacheQueue.add(
+ 'cacheGet',
+ { vodId },
+ { jobId: `cacheGet-${vodId}` }
+ );
+
+
+ // 4. wait for cache/download to finish
+ const result = await cacheJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3);
+ const cachePath = result.cachePath;
+
+ // 5. create thumbnail
+ const thumbnailFilePath = await __createThumbnail(cachePath);
+
+
+ // 6. update db record
+ const formData = new FormData();
+
+ const thumbnailBuffer = await readFile(thumbnailFilePath)
+ formData.append('thumbnail', new Blob([thumbnailBuffer]), basename(thumbnailFilePath));
+ await pb.collection('vods').update(vod.id, formData);
+
+}
+
+
+async function __createThumbnail(inputFilePath: string) {
if (!inputFilePath) {
throw new Error("inputFilePath is missing");
}
const outputFilePath = inputFilePath.replace(/\.[^/.]+$/, '') + '-thumb.png';
- const spawn = await getNanoSpawn();
- const result = await spawn('vcsi', [
+
+ await spawn('vcsi', [
inputFilePath,
'--metadata-position', 'hidden',
'--metadata-margin', '0',
@@ -46,81 +86,9 @@ async function createThumbnail(helpers: Helpers, inputFilePath: string) {
], {
stdout: 'inherit',
stderr: 'inherit',
- cwd: env.APP_DIR,
+ cwd: env.CACHE_ROOT,
});
- logger.debug('result as follows')
- logger.debug(JSON.stringify(result, null, 2))
-
- logger.info(`✅ Thumbnail saved to: ${outputFilePath}`);
return outputFilePath
}
-
-
-function assertPayload(payload: any): asserts payload is Payload {
- if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
- if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
-}
-
-
-export default async function createVideoThumbnail(payload: any, helpers: Helpers) {
- assertPayload(payload)
- const { vodId } = payload
- const vod = await prisma.vod.findFirstOrThrow({
- where: {
- id: vodId
- },
- include: {
- vtubers: {
- select: {
- slug: true,
- id: true,
- }
- }
- }
- })
- // * [x] load vod
-
-
- // * [x] exit if video.thumbnail already defined
- if (vod.thumbnail) {
- logger.info(`Doing nothing-- vod ${vodId} already has a thumbnail.`)
- return; // Exit the function early
- }
-
- if (!vod.sourceVideo) {
- throw new Error(`Failed to create thumbnail-- vod ${vodId} is missing a sourceVideo.`);
- }
-
-
- logger.info('Creating Video Thumbnail')
- const s3Client = getS3Client()
-
- // * [x] download video segments from pull-thru cache
- const videoFilePath = await getOrDownloadAsset(s3Client, env.S3_BUCKET, vod.sourceVideo)
- logger.debug(`videoFilePath=${videoFilePath}`)
-
- // * [x] run vcsi
- const thumbnailPath = await createThumbnail(helpers, videoFilePath)
- logger.debug(`thumbnailPath=${thumbnailPath}`)
-
- // * [x] generate thumbnail s3 key
- const slug = vod.vtubers[0].slug
- if (!slug) throw new Error(`vtuber ${vod.vtubers[0].id} was missing slug`);
- const s3Key = generateS3Path(slug, vod.streamDate, vod.id, `thumbnail.png`);
-
-
- // * [x] upload thumbnail to s3
- await uploadFile(s3Client, env.S3_BUCKET, s3Key, thumbnailPath, 'image/png')
-
- // * [x] update vod record
- await prisma.vod.update({
- where: { id: vodId },
- data: { thumbnail: s3Key }
- });
-
- // * [x] done
-
-
-}
\ No newline at end of file
diff --git a/services/worker/src/processors/findWork.ts b/services/worker/src/processors/findWork.ts
index 54316cb0..6bde88e4 100644
--- a/services/worker/src/processors/findWork.ts
+++ b/services/worker/src/processors/findWork.ts
@@ -1,12 +1,15 @@
import { Job, Queue } from "bullmq";
-import { getPocketBaseClient } from "../util/pocketbase";
import Client from "pocketbase";
-import { generalQueue } from "../queues/generalQueue";
-import { muxQueue } from "../queues/muxQueue";
+import { getPocketBaseClient } from "../util/pocketbase.ts";
+import { generalQueue } from "../queues/generalQueue.ts";
+import { muxQueue } from "../queues/muxQueue.ts";
+import { b2Queue } from "../queues/b2Queue.ts";
+import { shuffle } from "../util/random.ts";
const queues: Record = {
generalQueue: generalQueue,
muxQueue: muxQueue,
+ b2Queue: b2Queue,
};
type VodJobConfig = {
@@ -17,23 +20,40 @@ type VodJobConfig = {
};
async function handleMissing(job: Job, pb: Client, config: VodJobConfig) {
- const results = await pb.collection('vods').getList(1, 1, {
+
+
+ // Sometimes, we can run into a softlock state.
+ // If procesing a vod repeatedly fails, findWork will fail to queue new jobs because of task deduplication.
+ // @see https://docs.bullmq.io/patterns/throttle-jobs
+ // To overcome this, we randomly choose a vod from a list of the latest 3.
+ // If one vod gets permafailed, we have 2 more to continue working.
+ // This is an imperfect solution because the one permafailed vod must be rectified by an admin.
+ // @todo figure out a better way to handle permafailed vod processing tasks that isn't a ticking timebomb.
+ const results = await pb.collection('vods').getList(1, 3, {
filter: config.filter,
sort: '-created',
});
const vods = results.items;
- if (!vods.length) return; // nothing to do
+ if (!vods.length) {
+ // job.log(`No vods matching filter [${config.filter}]. Nothing to do.`)
+ return;
+ }
- const vod = vods[0];
+
+ const vod = shuffle(vods).at(0);
+ if (!vod) {
+ throw new Error('no vod found after shuffling');
+ };
const vodId = vod.id;
job.log(config.logMessage(vodId));
const jobId = `${config.processorName}-${vodId}`;
+ const attempts = 3;
- const queue = queues[config.queueName]; // <-- look here
- await queue.add(config.processorName, { vodId }, { jobId });
+ const queue = queues[config.queueName];
+ await queue.add(config.processorName, { vodId }, { jobId, attempts });
}
// export async function handleMissingTorrent(job: Job, pb: Client) {
@@ -125,12 +145,21 @@ export async function handleMissingStreamDate(job: Job, pb: Client) {
export async function handleMissingSourceVideo(job: Job, pb: Client) {
return handleMissing(job, pb, {
filter: "videoSrcB2 != '' && sourceVideo = ''",
- queueName: 'generalQueue',
+ queueName: 'b2Queue',
processorName: 'copyV1VideoToV3',
logMessage: (id) => `findWork found ${id} in need of a source video.`
});
}
+export async function handleMissingThumbnail(job: Job, pb: Client) {
+ return handleMissing(job, pb, {
+ filter: "sourceVideo != '' && thumbnail = ''",
+ queueName: 'generalQueue',
+ processorName: 'createVideoThumbnail',
+ logMessage: (id) => `findWork found ${id} in need of a thumbnail.`
+ });
+}
+
/**
* handleMissingMuxAsset
*
@@ -165,6 +194,7 @@ export async function findWork(job: Job) {
await handleMissingStreamDate(job, pb);
await handleMissingSourceVideo(job, pb);
await handleMissingMuxAsset(job, pb);
+ await handleMissingThumbnail(job, pb);
// findMissingThumbnail
diff --git a/services/worker/src/queues/b2Queue.ts b/services/worker/src/queues/b2Queue.ts
new file mode 100644
index 00000000..cd8510b1
--- /dev/null
+++ b/services/worker/src/queues/b2Queue.ts
@@ -0,0 +1,11 @@
+/**
+ * b2Queue for running long-running Backblaze tasks.
+ * Sometimes they take 3+ hours to do a server side bucket transfer and it holds up other tasks.
+ * Backblaze tasks get their own queue so as not to hold up other tasks.
+ */
+import { Queue, QueueEvents } from 'bullmq';
+import { connection } from '../../.config/bullmq.config';
+export const b2Queue = new Queue('b2Queue', { connection });
+export const b2QueueEvents = new QueueEvents("b2Queue", {
+ connection
+});
\ No newline at end of file
diff --git a/services/worker/src/queues/downloadQueue.ts b/services/worker/src/queues/downloadQueue.ts
index df6b4227..f302ea33 100644
--- a/services/worker/src/queues/downloadQueue.ts
+++ b/services/worker/src/queues/downloadQueue.ts
@@ -3,4 +3,6 @@ import { connection } from '../../.config/bullmq.config';
export const downloadQueue = new Queue('downloadQueue', { connection });
export const downloadQueueEvents = new QueueEvents("downloadQueue", {
connection
-});
\ No newline at end of file
+});
+
+await downloadQueue.setGlobalConcurrency(1);
\ No newline at end of file
diff --git a/services/worker/src/util/random.ts b/services/worker/src/util/random.ts
new file mode 100644
index 00000000..7c04212b
--- /dev/null
+++ b/services/worker/src/util/random.ts
@@ -0,0 +1,11 @@
+
+/**
+ * randomly shuffle an array
+ */
+export function shuffle(arr: T[]): T[] {
+ for (let i = arr.length - 1; i > 0; i--) {
+ const j = Math.floor(Math.random() * (i + 1));
+ [arr[i], arr[j]] = [arr[j], arr[i]];
+ }
+ return arr;
+}
\ No newline at end of file
diff --git a/services/worker/src/workers/b2Worker.ts b/services/worker/src/workers/b2Worker.ts
new file mode 100644
index 00000000..6d2d4996
--- /dev/null
+++ b/services/worker/src/workers/b2Worker.ts
@@ -0,0 +1,26 @@
+import { Worker } from 'bullmq';
+import { connection } from '../../.config/bullmq.config.ts';
+import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts';
+
+const workerName = 'b2Worker';
+const queueName = 'b2Queue';
+
+new Worker(
+ queueName,
+ async (job) => {
+ console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`);
+ switch (job.name) {
+ case 'copyV1VideoToV3':
+ return await copyV1VideoToV3(job);
+
+ default:
+ throw new Error(`${workerName} Unknown job name: ${job.name}`);
+ }
+ },
+ {
+ connection,
+ concurrency: 3
+ }
+);
+
+console.log(`${workerName} is running...`);
diff --git a/services/worker/src/workers/generalWorker.ts b/services/worker/src/workers/generalWorker.ts
index 13aee5a9..3e7b8504 100644
--- a/services/worker/src/workers/generalWorker.ts
+++ b/services/worker/src/workers/generalWorker.ts
@@ -10,6 +10,7 @@ import { createTorrent } from '../processors/createTorrent.ts';
import { analyzeAudio } from '../processors/analyzeAudio.ts';
import { findWork } from '../processors/findWork.ts';
import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts';
+import { createVideoThumbnail } from '../processors/createVideoThumbnail.ts';
new Worker(
'generalQueue',
@@ -43,6 +44,9 @@ new Worker(
case 'analyzeAudio':
return await analyzeAudio(job);
+ case 'createVideoThumbnail':
+ return await createVideoThumbnail(job);
+
default:
throw new Error(`Unknown job name: ${job.name}`);
}
diff --git a/services/worker/systemd/qbittorrent-nox.service b/services/worker/systemd/qbittorrent-nox.service
new file mode 100644
index 00000000..a43ab329
--- /dev/null
+++ b/services/worker/systemd/qbittorrent-nox.service
@@ -0,0 +1,15 @@
+[Unit]
+Description=qbittorrent-nox
+After=network.target
+
+[Service]
+Type=simple
+Restart=always
+RestartSec=5
+ExecStart=/home/cj/.local/bin/qbittorrent-nox --confirm-legal-notice --webui-port=8069 --profile=/home/cj/.config/futureporn/qbittorrent-nox
+WorkingDirectory=/home/cj/Documents/futureporn-monorepo/services/worker
+EnvironmentFile=/home/cj/Documents/futureporn-monorepo/services/worker/.env.production.local
+Restart=on-failure
+
+[Install]
+WantedBy=default.target
diff --git a/services/worker/systemd/up.sh b/services/worker/systemd/up.sh
index a3fd3545..369f9a5e 100755
--- a/services/worker/systemd/up.sh
+++ b/services/worker/systemd/up.sh
@@ -2,12 +2,14 @@
loginctl enable-linger
sudo cp worker.service /etc/systemd/user/worker.service
+sudo cp qbittorrent-nox.service /etc/systemd/user/worker.service
systemctl --user daemon-reload
systemctl --user restart worker
+systemctl --user restart qbittorrent-nox
systemctl --user enable worker
-systemctl --user status worker
+systemctl --user enable qbittorrent-nox
systemctl --user status worker
systemctl status valkey
|