rss torrent compatibility and thumbnail creation
This commit is contained in:
parent
354ae0b73f
commit
a5433e7bd5
@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "futureporn",
|
"name": "futureporn",
|
||||||
"version": "3.5.1",
|
"version": "4.0.0",
|
||||||
"private": true,
|
"private": true,
|
||||||
"description": "Dedication to the preservation of lewdtuber history",
|
"description": "Dedication to the preservation of lewdtuber history",
|
||||||
"license": "Unlicense",
|
"license": "Unlicense",
|
||||||
|
|||||||
@ -98,7 +98,7 @@ onFileDownloadRequest((event) => {
|
|||||||
// Then serve a 302 redirect instead of serving the file proxied thru PB
|
// Then serve a 302 redirect instead of serving the file proxied thru PB
|
||||||
|
|
||||||
const path = event.servedPath;
|
const path = event.servedPath;
|
||||||
const expires = Math.round(Date.now() / 1000) + 3600;
|
const expires = Math.round(Date.now() / 1000) + 7 * 24 * 3600; // 7 days
|
||||||
const signedUrl = signUrlCool(securityKey, baseUrl, path, rawQuery, expires);
|
const signedUrl = signUrlCool(securityKey, baseUrl, path, rawQuery, expires);
|
||||||
// console.log(`rawQUery`, rawQuery, 'path', path);
|
// console.log(`rawQUery`, rawQuery, 'path', path);
|
||||||
// console.log(`signedUrl=${signedUrl}`);
|
// console.log(`signedUrl=${signedUrl}`);
|
||||||
|
|||||||
@ -79,6 +79,10 @@
|
|||||||
</p>
|
</p>
|
||||||
<% } %>
|
<% } %>
|
||||||
|
|
||||||
|
<% if (data.vod?.get('torrent')) { %>
|
||||||
|
<p><b id="torrent">Torrent:</b> <a target="_blank" href="<%= data.vod?.get('torrent')%>">download</a></p>
|
||||||
|
<% } %>
|
||||||
|
|
||||||
<% if (data.vod?.get('magnetLink')) { %>
|
<% if (data.vod?.get('magnetLink')) { %>
|
||||||
<p><b id="magnet-link">Magnet Link:</b> <a target="_blank" href="<%= data.vod?.get('magnetLink')%>"><span class="icon"><svg xmlns="http://www.w3.org/2000/svg" width="32" height="32" viewBox="0 0 32 32">
|
<p><b id="magnet-link">Magnet Link:</b> <a target="_blank" href="<%= data.vod?.get('magnetLink')%>"><span class="icon"><svg xmlns="http://www.w3.org/2000/svg" width="32" height="32" viewBox="0 0 32 32">
|
||||||
<g fill="none">
|
<g fill="none">
|
||||||
|
|||||||
@ -6,7 +6,8 @@
|
|||||||
<th>Stream Date</th>
|
<th>Stream Date</th>
|
||||||
<th>VTuber</th>
|
<th>VTuber</th>
|
||||||
<th>Thumbnail</th>
|
<th>Thumbnail</th>
|
||||||
<th><abbr title="Download">DL</abbr></th>
|
<th>Torrent</th>
|
||||||
|
<th>Magnet Link</th>
|
||||||
</tr>
|
</tr>
|
||||||
</thead>
|
</thead>
|
||||||
<tbody>
|
<tbody>
|
||||||
@ -38,6 +39,18 @@
|
|||||||
<span>No thumbnail</span>
|
<span>No thumbnail</span>
|
||||||
<% } %>
|
<% } %>
|
||||||
</td>
|
</td>
|
||||||
|
<td>
|
||||||
|
<% if (vod?.torrent) { %>
|
||||||
|
|
||||||
|
<!-- /api/files/collectionIdOrName/recordId/filename -->
|
||||||
|
<a target="_blank" href="/api/files/vods/<%= vod?.id %>/<%= vod?.torrent %>">
|
||||||
|
<span class="icon">
|
||||||
|
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24">
|
||||||
|
<path fill="currentColor" d="M5 20h14v-2H5zM19 9h-4V3H9v6H5l7 7z" />
|
||||||
|
</svg>
|
||||||
|
</span></a>
|
||||||
|
<% } %>
|
||||||
|
</td>
|
||||||
<td>
|
<td>
|
||||||
<% if (vod?.magnetLink) { %>
|
<% if (vod?.magnetLink) { %>
|
||||||
<a target="_blank" href="<%= vod?.magnetLink %>">
|
<a target="_blank" href="<%= vod?.magnetLink %>">
|
||||||
|
|||||||
@ -62,12 +62,5 @@ function signUrl(url, securityKey, expirationTime = 103600, userIp, isDirectory
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const securityKey = 'd5814175-cc56-4098-ae63-1096301fb3c1';
|
|
||||||
const sampleUrl = 'https://fppbdev.b-cdn.net/pbc_3872109612/z0bpy5cwxi1uksv/g4ot5_omb_qaei_o7_y_tuzdlcn1se.jpeg';
|
|
||||||
// const expires = Math.round(Date.now() / 1000) + 3600;
|
|
||||||
const expires = 3524904301;
|
|
||||||
const signedUrl = signUrl(sampleUrl, securityKey, expires, null, false, '');
|
|
||||||
|
|
||||||
console.log(`signedUrl=${signedUrl}`);
|
|
||||||
|
|
||||||
module.exports = { signUrl };
|
module.exports = { signUrl };
|
||||||
@ -1,15 +1,27 @@
|
|||||||
# worker
|
# worker
|
||||||
|
|
||||||
To install dependencies:
|
To install node dependencies:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
bun install
|
npm install
|
||||||
```
|
```
|
||||||
|
|
||||||
To run:
|
To run:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
bun run index.ts
|
npm run start
|
||||||
```
|
```
|
||||||
|
|
||||||
This project was created using `bun init` in bun v1.3.1. [Bun](https://bun.com) is a fast all-in-one JavaScript runtime.
|
|
||||||
|
|
||||||
|
## External Dependencies
|
||||||
|
|
||||||
|
Worker needs these packages installed in order to be successful. Please make sure these are installed in the environment.
|
||||||
|
|
||||||
|
* vcsi
|
||||||
|
* whisper-cli
|
||||||
|
* yolo
|
||||||
|
* ffmpeg
|
||||||
|
* qbittorrent-nox
|
||||||
|
* b2cli
|
||||||
|
* valkey
|
||||||
@ -1,7 +1,6 @@
|
|||||||
import { createBullBoard } from '@bull-board/api';
|
import { createBullBoard } from '@bull-board/api';
|
||||||
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
|
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
|
||||||
import { ExpressAdapter } from '@bull-board/express';
|
import { ExpressAdapter } from '@bull-board/express';
|
||||||
import { type JobsOptions } from 'bullmq';
|
|
||||||
import express, { type Request, type Response } from 'express';
|
import express, { type Request, type Response } from 'express';
|
||||||
import { generalQueue } from './queues/generalQueue.ts';
|
import { generalQueue } from './queues/generalQueue.ts';
|
||||||
import { gpuQueue } from './queues/gpuQueue.ts';
|
import { gpuQueue } from './queues/gpuQueue.ts';
|
||||||
@ -11,6 +10,7 @@ import { version } from '../package.json';
|
|||||||
import { downloadQueue } from './queues/downloadQueue.ts';
|
import { downloadQueue } from './queues/downloadQueue.ts';
|
||||||
import { cacheQueue } from './queues/cacheQueue.ts';
|
import { cacheQueue } from './queues/cacheQueue.ts';
|
||||||
import { muxQueue } from './queues/muxQueue.ts';
|
import { muxQueue } from './queues/muxQueue.ts';
|
||||||
|
import { b2Queue } from './queues/b2Queue.ts';
|
||||||
|
|
||||||
const run = async () => {
|
const run = async () => {
|
||||||
|
|
||||||
@ -30,6 +30,7 @@ const run = async () => {
|
|||||||
new BullMQAdapter(downloadQueue),
|
new BullMQAdapter(downloadQueue),
|
||||||
new BullMQAdapter(cacheQueue),
|
new BullMQAdapter(cacheQueue),
|
||||||
new BullMQAdapter(muxQueue),
|
new BullMQAdapter(muxQueue),
|
||||||
|
new BullMQAdapter(b2Queue),
|
||||||
],
|
],
|
||||||
serverAdapter,
|
serverAdapter,
|
||||||
});
|
});
|
||||||
|
|||||||
@ -3,7 +3,7 @@ import fs from "node:fs/promises";
|
|||||||
import { join } from "node:path";
|
import { join } from "node:path";
|
||||||
import env from "../../.config/env";
|
import env from "../../.config/env";
|
||||||
|
|
||||||
const retainmentDayCount = 90;
|
const retainmentDayCount = 2;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* cacheCleanup
|
* cacheCleanup
|
||||||
|
|||||||
@ -20,16 +20,13 @@
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
import env from "../../.config/env";
|
|
||||||
import { sshClient } from "../util/sftp";
|
import { sshClient } from "../util/sftp";
|
||||||
import { qbtClient, QBTorrentInfo } from "../util/qbittorrent";
|
import { qbtClient, QBTorrentInfo } from "../util/qbittorrent";
|
||||||
import { Job } from "bullmq";
|
import { Job } from "bullmq";
|
||||||
import { getPocketBaseClient } from "../util/pocketbase";
|
import { getPocketBaseClient } from "../util/pocketbase";
|
||||||
import spawn from "nano-spawn";
|
import { basename } from 'node:path';
|
||||||
import { join, basename } from 'node:path';
|
|
||||||
import { tmpdir } from "node:os";
|
|
||||||
import { nanoid } from "nanoid";
|
|
||||||
import { cacheQueue, cacheQueueEvents } from "../queues/cacheQueue";
|
import { cacheQueue, cacheQueueEvents } from "../queues/cacheQueue";
|
||||||
|
import { readFile } from "node:fs/promises";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -211,12 +208,12 @@ export async function createTorrent(job: Job) {
|
|||||||
const cacheGetJob = await cacheQueue.add(
|
const cacheGetJob = await cacheQueue.add(
|
||||||
'cacheGet',
|
'cacheGet',
|
||||||
{ vodId },
|
{ vodId },
|
||||||
{ jobId: `cache-${vodId}` }
|
{ jobId: `cacheGet-${vodId}` }
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
// 4. wait up to 3 hours for download to finish
|
// 4. wait up to 5 hours for download to finish
|
||||||
const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3));
|
const results = (await cacheGetJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 5));
|
||||||
await job.log(`cacheGet results: ${JSON.stringify(results)}`);
|
await job.log(`cacheGet results: ${JSON.stringify(results)}`);
|
||||||
const { cachePath } = results;
|
const { cachePath } = results;
|
||||||
|
|
||||||
@ -230,10 +227,18 @@ export async function createTorrent(job: Job) {
|
|||||||
await job.log(`great! torrent created at ${torrentFilePath}. Now let's upload that torrent and the VOD to the seedbox. This will take some time...`);
|
await job.log(`great! torrent created at ${torrentFilePath}. Now let's upload that torrent and the VOD to the seedbox. This will take some time...`);
|
||||||
await uploadTorrentToSeedbox(job, cachePath, torrentFilePath);
|
await uploadTorrentToSeedbox(job, cachePath, torrentFilePath);
|
||||||
|
|
||||||
job.log(`updating vod record...`);
|
job.log(`Updating vod record in the db. This involes sending a multipart/form to pocketbase with the .torrent file as a buffer and the magnetLink as text...`);
|
||||||
await pb.collection('vods').update(vod.id, {
|
const formData = new FormData();
|
||||||
magnetLink
|
|
||||||
});
|
const torrentBuffer = await readFile(torrentFilePath)
|
||||||
|
formData.append('torrent', new Blob([torrentBuffer]), basename(torrentFilePath));
|
||||||
|
formData.append('magnetLink', magnetLink);
|
||||||
|
|
||||||
|
await pb.collection('vods').update(vod.id, formData);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
job.log(`Torrent creation complete.`);
|
job.log(`Torrent creation complete.`);
|
||||||
await job.updateProgress(100);
|
await job.updateProgress(100);
|
||||||
|
|||||||
@ -1,33 +1,73 @@
|
|||||||
import type { Helpers } from "graphile-worker";
|
import env from "../../.config/env";
|
||||||
import { PrismaClient } from "../../generated/prisma";
|
import { Job } from "bullmq";
|
||||||
import { withAccelerate } from "@prisma/extension-accelerate";
|
import { getPocketBaseClient } from "../util/pocketbase";
|
||||||
import { getOrDownloadAsset } from "../utils/cache";
|
import spawn from 'nano-spawn';
|
||||||
import { env } from "../config/env";
|
import { readFile } from "fs/promises";
|
||||||
import { getS3Client, uploadFile } from "../utils/s3";
|
import { basename } from "node:path";
|
||||||
import { nanoid } from "nanoid";
|
import { cacheQueue, cacheQueueEvents } from '../queues/cacheQueue';
|
||||||
import { getNanoSpawn } from "../utils/nanoSpawn";
|
|
||||||
import { generateS3Path } from "../utils/formatters";
|
|
||||||
import logger from "../utils/logger";
|
|
||||||
import { preparePython } from "../utils/python";
|
|
||||||
|
|
||||||
const prisma = new PrismaClient().$extends(withAccelerate());
|
|
||||||
|
|
||||||
|
|
||||||
interface Payload {
|
interface Payload {
|
||||||
vodId: string;
|
vodId: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function createThumbnail(helpers: Helpers, inputFilePath: string) {
|
|
||||||
logger.debug(`createThumbnail with inputFilePath=${inputFilePath}`)
|
function assertPayload(payload: any): asserts payload is Payload {
|
||||||
|
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
||||||
|
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* createVideoThumbnail
|
||||||
|
*
|
||||||
|
* uses vcsi to create a 5x5 grid of video frames
|
||||||
|
*/
|
||||||
|
export async function createVideoThumbnail(job: Job) {
|
||||||
|
|
||||||
|
assertPayload(job.data);
|
||||||
|
const vodId = job.data.vodId;
|
||||||
|
const pb = await getPocketBaseClient();
|
||||||
|
const vod = await pb.collection('vods').getOne(vodId);
|
||||||
|
|
||||||
|
job.log(`createVideoThumbnail for ${vodId} starting.`);
|
||||||
|
|
||||||
|
job.log(`pulling sourceVideo from cache...`);
|
||||||
|
|
||||||
|
const cacheJob = await cacheQueue.add(
|
||||||
|
'cacheGet',
|
||||||
|
{ vodId },
|
||||||
|
{ jobId: `cacheGet-${vodId}` }
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
// 4. wait for cache/download to finish
|
||||||
|
const result = await cacheJob.waitUntilFinished(cacheQueueEvents, 1000 * 60 * 60 * 3);
|
||||||
|
const cachePath = result.cachePath;
|
||||||
|
|
||||||
|
// 5. create thumbnail
|
||||||
|
const thumbnailFilePath = await __createThumbnail(cachePath);
|
||||||
|
|
||||||
|
|
||||||
|
// 6. update db record
|
||||||
|
const formData = new FormData();
|
||||||
|
|
||||||
|
const thumbnailBuffer = await readFile(thumbnailFilePath)
|
||||||
|
formData.append('thumbnail', new Blob([thumbnailBuffer]), basename(thumbnailFilePath));
|
||||||
|
await pb.collection('vods').update(vod.id, formData);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async function __createThumbnail(inputFilePath: string) {
|
||||||
if (!inputFilePath) {
|
if (!inputFilePath) {
|
||||||
throw new Error("inputFilePath is missing");
|
throw new Error("inputFilePath is missing");
|
||||||
}
|
}
|
||||||
|
|
||||||
const outputFilePath = inputFilePath.replace(/\.[^/.]+$/, '') + '-thumb.png';
|
const outputFilePath = inputFilePath.replace(/\.[^/.]+$/, '') + '-thumb.png';
|
||||||
const spawn = await getNanoSpawn();
|
|
||||||
const result = await spawn('vcsi', [
|
await spawn('vcsi', [
|
||||||
inputFilePath,
|
inputFilePath,
|
||||||
'--metadata-position', 'hidden',
|
'--metadata-position', 'hidden',
|
||||||
'--metadata-margin', '0',
|
'--metadata-margin', '0',
|
||||||
@ -46,81 +86,9 @@ async function createThumbnail(helpers: Helpers, inputFilePath: string) {
|
|||||||
], {
|
], {
|
||||||
stdout: 'inherit',
|
stdout: 'inherit',
|
||||||
stderr: 'inherit',
|
stderr: 'inherit',
|
||||||
cwd: env.APP_DIR,
|
cwd: env.CACHE_ROOT,
|
||||||
});
|
});
|
||||||
|
|
||||||
logger.debug('result as follows')
|
|
||||||
logger.debug(JSON.stringify(result, null, 2))
|
|
||||||
|
|
||||||
logger.info(`✅ Thumbnail saved to: ${outputFilePath}`);
|
|
||||||
return outputFilePath
|
return outputFilePath
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
function assertPayload(payload: any): asserts payload is Payload {
|
|
||||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
|
||||||
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
export default async function createVideoThumbnail(payload: any, helpers: Helpers) {
|
|
||||||
assertPayload(payload)
|
|
||||||
const { vodId } = payload
|
|
||||||
const vod = await prisma.vod.findFirstOrThrow({
|
|
||||||
where: {
|
|
||||||
id: vodId
|
|
||||||
},
|
|
||||||
include: {
|
|
||||||
vtubers: {
|
|
||||||
select: {
|
|
||||||
slug: true,
|
|
||||||
id: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
// * [x] load vod
|
|
||||||
|
|
||||||
|
|
||||||
// * [x] exit if video.thumbnail already defined
|
|
||||||
if (vod.thumbnail) {
|
|
||||||
logger.info(`Doing nothing-- vod ${vodId} already has a thumbnail.`)
|
|
||||||
return; // Exit the function early
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!vod.sourceVideo) {
|
|
||||||
throw new Error(`Failed to create thumbnail-- vod ${vodId} is missing a sourceVideo.`);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
logger.info('Creating Video Thumbnail')
|
|
||||||
const s3Client = getS3Client()
|
|
||||||
|
|
||||||
// * [x] download video segments from pull-thru cache
|
|
||||||
const videoFilePath = await getOrDownloadAsset(s3Client, env.S3_BUCKET, vod.sourceVideo)
|
|
||||||
logger.debug(`videoFilePath=${videoFilePath}`)
|
|
||||||
|
|
||||||
// * [x] run vcsi
|
|
||||||
const thumbnailPath = await createThumbnail(helpers, videoFilePath)
|
|
||||||
logger.debug(`thumbnailPath=${thumbnailPath}`)
|
|
||||||
|
|
||||||
// * [x] generate thumbnail s3 key
|
|
||||||
const slug = vod.vtubers[0].slug
|
|
||||||
if (!slug) throw new Error(`vtuber ${vod.vtubers[0].id} was missing slug`);
|
|
||||||
const s3Key = generateS3Path(slug, vod.streamDate, vod.id, `thumbnail.png`);
|
|
||||||
|
|
||||||
|
|
||||||
// * [x] upload thumbnail to s3
|
|
||||||
await uploadFile(s3Client, env.S3_BUCKET, s3Key, thumbnailPath, 'image/png')
|
|
||||||
|
|
||||||
// * [x] update vod record
|
|
||||||
await prisma.vod.update({
|
|
||||||
where: { id: vodId },
|
|
||||||
data: { thumbnail: s3Key }
|
|
||||||
});
|
|
||||||
|
|
||||||
// * [x] done
|
|
||||||
|
|
||||||
|
|
||||||
}
|
|
||||||
@ -1,12 +1,15 @@
|
|||||||
import { Job, Queue } from "bullmq";
|
import { Job, Queue } from "bullmq";
|
||||||
import { getPocketBaseClient } from "../util/pocketbase";
|
|
||||||
import Client from "pocketbase";
|
import Client from "pocketbase";
|
||||||
import { generalQueue } from "../queues/generalQueue";
|
import { getPocketBaseClient } from "../util/pocketbase.ts";
|
||||||
import { muxQueue } from "../queues/muxQueue";
|
import { generalQueue } from "../queues/generalQueue.ts";
|
||||||
|
import { muxQueue } from "../queues/muxQueue.ts";
|
||||||
|
import { b2Queue } from "../queues/b2Queue.ts";
|
||||||
|
import { shuffle } from "../util/random.ts";
|
||||||
|
|
||||||
const queues: Record<string, Queue> = {
|
const queues: Record<string, Queue> = {
|
||||||
generalQueue: generalQueue,
|
generalQueue: generalQueue,
|
||||||
muxQueue: muxQueue,
|
muxQueue: muxQueue,
|
||||||
|
b2Queue: b2Queue,
|
||||||
};
|
};
|
||||||
|
|
||||||
type VodJobConfig = {
|
type VodJobConfig = {
|
||||||
@ -17,23 +20,40 @@ type VodJobConfig = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
async function handleMissing(job: Job, pb: Client, config: VodJobConfig) {
|
async function handleMissing(job: Job, pb: Client, config: VodJobConfig) {
|
||||||
const results = await pb.collection('vods').getList(1, 1, {
|
|
||||||
|
|
||||||
|
// Sometimes, we can run into a softlock state.
|
||||||
|
// If procesing a vod repeatedly fails, findWork will fail to queue new jobs because of task deduplication.
|
||||||
|
// @see https://docs.bullmq.io/patterns/throttle-jobs
|
||||||
|
// To overcome this, we randomly choose a vod from a list of the latest 3.
|
||||||
|
// If one vod gets permafailed, we have 2 more to continue working.
|
||||||
|
// This is an imperfect solution because the one permafailed vod must be rectified by an admin.
|
||||||
|
// @todo figure out a better way to handle permafailed vod processing tasks that isn't a ticking timebomb.
|
||||||
|
const results = await pb.collection('vods').getList(1, 3, {
|
||||||
filter: config.filter,
|
filter: config.filter,
|
||||||
sort: '-created',
|
sort: '-created',
|
||||||
});
|
});
|
||||||
|
|
||||||
const vods = results.items;
|
const vods = results.items;
|
||||||
if (!vods.length) return; // nothing to do
|
if (!vods.length) {
|
||||||
|
// job.log(`No vods matching filter [${config.filter}]. Nothing to do.`)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const vod = vods[0];
|
|
||||||
|
const vod = shuffle(vods).at(0);
|
||||||
|
if (!vod) {
|
||||||
|
throw new Error('no vod found after shuffling');
|
||||||
|
};
|
||||||
const vodId = vod.id;
|
const vodId = vod.id;
|
||||||
|
|
||||||
job.log(config.logMessage(vodId));
|
job.log(config.logMessage(vodId));
|
||||||
|
|
||||||
const jobId = `${config.processorName}-${vodId}`;
|
const jobId = `${config.processorName}-${vodId}`;
|
||||||
|
const attempts = 3;
|
||||||
|
|
||||||
const queue = queues[config.queueName]; // <-- look here
|
const queue = queues[config.queueName];
|
||||||
await queue.add(config.processorName, { vodId }, { jobId });
|
await queue.add(config.processorName, { vodId }, { jobId, attempts });
|
||||||
}
|
}
|
||||||
|
|
||||||
// export async function handleMissingTorrent(job: Job, pb: Client) {
|
// export async function handleMissingTorrent(job: Job, pb: Client) {
|
||||||
@ -125,12 +145,21 @@ export async function handleMissingStreamDate(job: Job, pb: Client) {
|
|||||||
export async function handleMissingSourceVideo(job: Job, pb: Client) {
|
export async function handleMissingSourceVideo(job: Job, pb: Client) {
|
||||||
return handleMissing(job, pb, {
|
return handleMissing(job, pb, {
|
||||||
filter: "videoSrcB2 != '' && sourceVideo = ''",
|
filter: "videoSrcB2 != '' && sourceVideo = ''",
|
||||||
queueName: 'generalQueue',
|
queueName: 'b2Queue',
|
||||||
processorName: 'copyV1VideoToV3',
|
processorName: 'copyV1VideoToV3',
|
||||||
logMessage: (id) => `findWork found ${id} in need of a source video.`
|
logMessage: (id) => `findWork found ${id} in need of a source video.`
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function handleMissingThumbnail(job: Job, pb: Client) {
|
||||||
|
return handleMissing(job, pb, {
|
||||||
|
filter: "sourceVideo != '' && thumbnail = ''",
|
||||||
|
queueName: 'generalQueue',
|
||||||
|
processorName: 'createVideoThumbnail',
|
||||||
|
logMessage: (id) => `findWork found ${id} in need of a thumbnail.`
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* handleMissingMuxAsset
|
* handleMissingMuxAsset
|
||||||
*
|
*
|
||||||
@ -165,6 +194,7 @@ export async function findWork(job: Job) {
|
|||||||
await handleMissingStreamDate(job, pb);
|
await handleMissingStreamDate(job, pb);
|
||||||
await handleMissingSourceVideo(job, pb);
|
await handleMissingSourceVideo(job, pb);
|
||||||
await handleMissingMuxAsset(job, pb);
|
await handleMissingMuxAsset(job, pb);
|
||||||
|
await handleMissingThumbnail(job, pb);
|
||||||
|
|
||||||
|
|
||||||
// findMissingThumbnail
|
// findMissingThumbnail
|
||||||
|
|||||||
11
services/worker/src/queues/b2Queue.ts
Normal file
11
services/worker/src/queues/b2Queue.ts
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
/**
|
||||||
|
* b2Queue for running long-running Backblaze tasks.
|
||||||
|
* Sometimes they take 3+ hours to do a server side bucket transfer and it holds up other tasks.
|
||||||
|
* Backblaze tasks get their own queue so as not to hold up other tasks.
|
||||||
|
*/
|
||||||
|
import { Queue, QueueEvents } from 'bullmq';
|
||||||
|
import { connection } from '../../.config/bullmq.config';
|
||||||
|
export const b2Queue = new Queue('b2Queue', { connection });
|
||||||
|
export const b2QueueEvents = new QueueEvents("b2Queue", {
|
||||||
|
connection
|
||||||
|
});
|
||||||
@ -4,3 +4,5 @@ export const downloadQueue = new Queue('downloadQueue', { connection });
|
|||||||
export const downloadQueueEvents = new QueueEvents("downloadQueue", {
|
export const downloadQueueEvents = new QueueEvents("downloadQueue", {
|
||||||
connection
|
connection
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await downloadQueue.setGlobalConcurrency(1);
|
||||||
11
services/worker/src/util/random.ts
Normal file
11
services/worker/src/util/random.ts
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
|
||||||
|
/**
|
||||||
|
* randomly shuffle an array
|
||||||
|
*/
|
||||||
|
export function shuffle<T>(arr: T[]): T[] {
|
||||||
|
for (let i = arr.length - 1; i > 0; i--) {
|
||||||
|
const j = Math.floor(Math.random() * (i + 1));
|
||||||
|
[arr[i], arr[j]] = [arr[j], arr[i]];
|
||||||
|
}
|
||||||
|
return arr;
|
||||||
|
}
|
||||||
26
services/worker/src/workers/b2Worker.ts
Normal file
26
services/worker/src/workers/b2Worker.ts
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
import { Worker } from 'bullmq';
|
||||||
|
import { connection } from '../../.config/bullmq.config.ts';
|
||||||
|
import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts';
|
||||||
|
|
||||||
|
const workerName = 'b2Worker';
|
||||||
|
const queueName = 'b2Queue';
|
||||||
|
|
||||||
|
new Worker(
|
||||||
|
queueName,
|
||||||
|
async (job) => {
|
||||||
|
console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`);
|
||||||
|
switch (job.name) {
|
||||||
|
case 'copyV1VideoToV3':
|
||||||
|
return await copyV1VideoToV3(job);
|
||||||
|
|
||||||
|
default:
|
||||||
|
throw new Error(`${workerName} Unknown job name: ${job.name}`);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
connection,
|
||||||
|
concurrency: 3
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
console.log(`${workerName} is running...`);
|
||||||
@ -10,6 +10,7 @@ import { createTorrent } from '../processors/createTorrent.ts';
|
|||||||
import { analyzeAudio } from '../processors/analyzeAudio.ts';
|
import { analyzeAudio } from '../processors/analyzeAudio.ts';
|
||||||
import { findWork } from '../processors/findWork.ts';
|
import { findWork } from '../processors/findWork.ts';
|
||||||
import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts';
|
import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts';
|
||||||
|
import { createVideoThumbnail } from '../processors/createVideoThumbnail.ts';
|
||||||
|
|
||||||
new Worker(
|
new Worker(
|
||||||
'generalQueue',
|
'generalQueue',
|
||||||
@ -43,6 +44,9 @@ new Worker(
|
|||||||
case 'analyzeAudio':
|
case 'analyzeAudio':
|
||||||
return await analyzeAudio(job);
|
return await analyzeAudio(job);
|
||||||
|
|
||||||
|
case 'createVideoThumbnail':
|
||||||
|
return await createVideoThumbnail(job);
|
||||||
|
|
||||||
default:
|
default:
|
||||||
throw new Error(`Unknown job name: ${job.name}`);
|
throw new Error(`Unknown job name: ${job.name}`);
|
||||||
}
|
}
|
||||||
|
|||||||
15
services/worker/systemd/qbittorrent-nox.service
Normal file
15
services/worker/systemd/qbittorrent-nox.service
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[Unit]
|
||||||
|
Description=qbittorrent-nox
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
Type=simple
|
||||||
|
Restart=always
|
||||||
|
RestartSec=5
|
||||||
|
ExecStart=/home/cj/.local/bin/qbittorrent-nox --confirm-legal-notice --webui-port=8069 --profile=/home/cj/.config/futureporn/qbittorrent-nox
|
||||||
|
WorkingDirectory=/home/cj/Documents/futureporn-monorepo/services/worker
|
||||||
|
EnvironmentFile=/home/cj/Documents/futureporn-monorepo/services/worker/.env.production.local
|
||||||
|
Restart=on-failure
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=default.target
|
||||||
@ -2,12 +2,14 @@
|
|||||||
|
|
||||||
loginctl enable-linger
|
loginctl enable-linger
|
||||||
sudo cp worker.service /etc/systemd/user/worker.service
|
sudo cp worker.service /etc/systemd/user/worker.service
|
||||||
|
sudo cp qbittorrent-nox.service /etc/systemd/user/worker.service
|
||||||
|
|
||||||
|
|
||||||
systemctl --user daemon-reload
|
systemctl --user daemon-reload
|
||||||
systemctl --user restart worker
|
systemctl --user restart worker
|
||||||
|
systemctl --user restart qbittorrent-nox
|
||||||
systemctl --user enable worker
|
systemctl --user enable worker
|
||||||
systemctl --user status worker
|
systemctl --user enable qbittorrent-nox
|
||||||
systemctl --user status worker
|
systemctl --user status worker
|
||||||
systemctl status valkey
|
systemctl status valkey
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user