add download cache
This commit is contained in:
parent
87b054e66f
commit
72c607266f
10
.vscode/tasks.json
vendored
10
.vscode/tasks.json
vendored
@ -18,6 +18,16 @@
|
||||
"problemMatcher": [],
|
||||
"isBackground": true
|
||||
},
|
||||
{
|
||||
"label": "Run sftp dev server",
|
||||
"type": "shell",
|
||||
"command": "docker run --name=futureporn-sftp-DEV --rm -it -v /home/cj/Documents/futureporn-monorepo/services/worker/src/fixtures/sftp:/home/fp -p 2222:22 atmoz/sftp fp:password:1000",
|
||||
"problemMatcher": [],
|
||||
"isBackground": true,
|
||||
"runOptions": {
|
||||
"runOn": "folderOpen"
|
||||
}
|
||||
},
|
||||
{
|
||||
"label": "Run postgres",
|
||||
"type": "shell",
|
||||
|
||||
@ -78,7 +78,12 @@
|
||||
<% } %>
|
||||
|
||||
<% if (data.vod?.get('magnetLink')) { %>
|
||||
<p><b id="magnet-link">Magnet Link:</b> <%= data.vod?.get('magnetLink') %></p>
|
||||
<p><b id="magnet-link">Magnet Link:</b> <a target="_blank" href="<%= data.vod?.get('magnetLink')%>"><span class="icon"><svg xmlns="http://www.w3.org/2000/svg" width="32" height="32" viewBox="0 0 32 32">
|
||||
<g fill="none">
|
||||
<path fill="#d3d3d3" d="M11 23v6.06c0 .52-.42.94-.94.94H3.94c-.52 0-.94-.42-.94-.94V23l4.028-2.152zm18 0v6.06c0 .52-.42.94-.94.94h-6.12c-.52 0-.94-.42-.94-.94V23l3.99-2.152z" />
|
||||
<path fill="#f8312f" d="M11 23v-7.94c0-2.75 2.2-5.04 4.95-5.06c2.78-.03 5.05 2.23 5.05 5v8h8v-8c0-7.18-5.82-13-13-13S3 7.82 3 15v8z" />
|
||||
</g>
|
||||
</svg></span></a></p>
|
||||
<% } %>
|
||||
|
||||
<% if (data.vod?.get('notes')) { %>
|
||||
|
||||
11
services/pocketbase/pb_hooks/pages/vt/+middleware.js
Normal file
11
services/pocketbase/pb_hooks/pages/vt/+middleware.js
Normal file
@ -0,0 +1,11 @@
|
||||
|
||||
/**
|
||||
This directory is solely for redirecting futureporn v2 paths
|
||||
|
||||
ex: /vt/el_xox/vod/20240317T214358Z redirects to /vods/kaovo483w4f4rc1
|
||||
*/
|
||||
|
||||
|
||||
module.exports = function ({ redirect }, next) {
|
||||
next()
|
||||
}
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/2rrfsm74d4xv4ez', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/g2qvnxnlfo24dra', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/q8e7ur502bzrp7l', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/4csbaj06u481x9t', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/kaovo483w4f4rc1', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/35baz6ab6pnvmdk', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/k7efwbyrmgvfxan', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/ey127duzwhmkhvj', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -0,0 +1,5 @@
|
||||
<script server>
|
||||
redirect('/vods/lpi5e30klfxl378', {
|
||||
status: 308
|
||||
})
|
||||
</script>
|
||||
@ -28,14 +28,15 @@ const env = (() => {
|
||||
if (!process.env.APIFY_TOKEN) throw new Error('APIFY_TOKEN missing in env');
|
||||
if (!process.env.NODE_ENV) throw new Error('APIFY_TOKEN missing in env');
|
||||
if (!process.env.CACHE_ROOT) throw new Error('CACHE_ROOT missing in env');
|
||||
if (!process.env.SEEDBOX_SFTP_URL) throw new Error('SEEDBOX_SFTP_URL missing in env');
|
||||
if (!process.env.SEEDBOX_SFTP_HOST) throw new Error('SEEDBOX_SFTP_HOST missing in env');
|
||||
if (!process.env.SEEDBOX_SFTP_PORT) throw new Error('SEEDBOX_SFTP_PORT missing in env');
|
||||
if (!process.env.SEEDBOX_SFTP_USERNAME) throw new Error('SEEDBOX_SFTP_USERNAME missing in env');
|
||||
if (!process.env.SEEDBOX_SFTP_PASSWORD) throw new Error('SEEDBOX_SFTP_PASSWORD missing in env');
|
||||
if (!process.env.QBT_HOST) throw new Error('QBT_HOST missing in env');
|
||||
if (!process.env.QBT_PORT) throw new Error('QBT_PORT missing in env');
|
||||
if (!process.env.QBT_PASSWORD) throw new Error('QBT_PASSWORD missing in env');
|
||||
if (!process.env.QBT_USERNAME) throw new Error('QBT_USERNAME missing in env');
|
||||
|
||||
if (!process.env.WORKER_WORKERS) throw new Error('WORKER_WORKERS missing in env');
|
||||
|
||||
const {
|
||||
PORT,
|
||||
@ -65,7 +66,8 @@ const env = (() => {
|
||||
APIFY_TOKEN,
|
||||
NODE_ENV,
|
||||
CACHE_ROOT,
|
||||
SEEDBOX_SFTP_URL,
|
||||
SEEDBOX_SFTP_HOST,
|
||||
SEEDBOX_SFTP_PORT,
|
||||
SEEDBOX_SFTP_USERNAME,
|
||||
SEEDBOX_SFTP_PASSWORD,
|
||||
VALKEY_PORT,
|
||||
@ -73,6 +75,7 @@ const env = (() => {
|
||||
QBT_USERNAME,
|
||||
QBT_PASSWORD,
|
||||
QBT_PORT,
|
||||
WORKER_WORKERS,
|
||||
} = process.env
|
||||
return {
|
||||
PORT,
|
||||
@ -102,7 +105,8 @@ const env = (() => {
|
||||
APIFY_TOKEN,
|
||||
NODE_ENV,
|
||||
CACHE_ROOT,
|
||||
SEEDBOX_SFTP_URL,
|
||||
SEEDBOX_SFTP_HOST,
|
||||
SEEDBOX_SFTP_PORT,
|
||||
SEEDBOX_SFTP_USERNAME,
|
||||
SEEDBOX_SFTP_PASSWORD,
|
||||
VALKEY_PORT,
|
||||
@ -110,6 +114,7 @@ const env = (() => {
|
||||
QBT_USERNAME,
|
||||
QBT_PASSWORD,
|
||||
QBT_PORT,
|
||||
WORKER_WORKERS,
|
||||
}
|
||||
})()
|
||||
|
||||
|
||||
@ -1,2 +0,0 @@
|
||||
{
|
||||
}
|
||||
@ -1,2 +0,0 @@
|
||||
{
|
||||
}
|
||||
@ -1,2 +0,0 @@
|
||||
{
|
||||
}
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -8,6 +8,8 @@ import { gpuQueue } from './queues/gpuQueue.ts';
|
||||
import { highPriorityQueue } from './queues/highPriorityQueue.ts';
|
||||
import env from '../.config/env.ts';
|
||||
import { version } from '../package.json';
|
||||
import { downloadQueue } from './queues/downloadQueue.ts';
|
||||
import { cacheQueue } from './queues/cacheQueue.ts';
|
||||
|
||||
const run = async () => {
|
||||
|
||||
@ -24,22 +26,16 @@ const run = async () => {
|
||||
new BullMQAdapter(highPriorityQueue),
|
||||
new BullMQAdapter(generalQueue),
|
||||
new BullMQAdapter(gpuQueue),
|
||||
new BullMQAdapter(downloadQueue),
|
||||
new BullMQAdapter(cacheQueue),
|
||||
],
|
||||
serverAdapter,
|
||||
});
|
||||
|
||||
console.log('importing workers');
|
||||
if (process.env.NODE_ENV === 'development') {
|
||||
await import('./workers/highPriorityWorker.ts');
|
||||
await import('./workers/generalWorker.ts');
|
||||
await import('./workers/gpuWorker.ts');
|
||||
} else {
|
||||
// @todo I separated these so that they can be ran on multiple machines.
|
||||
// @todo we should activate these based on environment variable feature flags, not NODE_ENV
|
||||
await import('./workers/highPriorityWorker.ts');
|
||||
await import('./workers/generalWorker.ts');
|
||||
// await import('./workers/gpuWorker.ts'); // @todo implement
|
||||
}
|
||||
process.env.WORKER_WORKERS?.split(',').forEach(async (worker) => {
|
||||
console.log(`👷 Importing ${worker}`);
|
||||
await import(`./workers/${worker}.ts`);
|
||||
})
|
||||
|
||||
app.get('/', (req: Request, res: Response) => {
|
||||
res.send(`
|
||||
@ -74,8 +70,10 @@ const run = async () => {
|
||||
<li><a href="/ui">Bull Dashboard</a></li>
|
||||
<li><a href="/task?name=presignMuxAssets">Task: presignMuxAssets</a></li>
|
||||
<li><a href="/task?name=copyV1VideoAll">Task: copyV1VideoAll</a></li>
|
||||
<li><a href="/task?name=copyV1VideoToV3&vodId=1234">Task: copyV1VideoToV3</a></li>
|
||||
<li><a href="/task?name=createTorrent&vodId=1234">Task: createTorrent</a></li>
|
||||
<li><a href="/task?name=createMuxAsset&vodId=">Task: createMuxAsset</a></li>
|
||||
<li><a href="/task?name=cacheGet&vodId=1234">Task: cacheGet</a></li>
|
||||
</ul>
|
||||
`)
|
||||
})
|
||||
@ -86,7 +84,7 @@ const run = async () => {
|
||||
const name = req.query.name as string;
|
||||
const vodId = req.query.vodId as string;
|
||||
// console.log('vodId', vodId, 'name', name);
|
||||
// console.log(JSON.stringify(req.query, null, 2))
|
||||
// console.log(JSON.stringify(req.query, null, 2));
|
||||
|
||||
const data = { vodId };
|
||||
|
||||
@ -96,6 +94,12 @@ const run = async () => {
|
||||
case 'scheduleVodProcessing':
|
||||
await gpuQueue.add(name, data);
|
||||
break;
|
||||
case 'cacheGet':
|
||||
await cacheQueue.add(name, data);
|
||||
break;
|
||||
case 'download':
|
||||
await downloadQueue.add(name, data);
|
||||
break;
|
||||
default:
|
||||
await highPriorityQueue.add(name, data);
|
||||
break;
|
||||
|
||||
@ -3,9 +3,17 @@ import { getPocketBaseClient } from "../util/pocketbase";
|
||||
import Client from "pocketbase";
|
||||
import { Vod } from "../types";
|
||||
|
||||
interface Payload {
|
||||
vodId: string;
|
||||
}
|
||||
|
||||
const foo = 'bar';
|
||||
|
||||
function assertPayload(payload: any): asserts payload is Payload {
|
||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
||||
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
||||
}
|
||||
|
||||
/**
|
||||
* barFunction
|
||||
*
|
||||
@ -36,17 +44,18 @@ async function getApplicableVods(pb: Client) {
|
||||
*
|
||||
* A template to copy for when we make new processors. Shows the general scaffolding.
|
||||
*
|
||||
* Remember to makes processors
|
||||
* Remember to make processors
|
||||
* * idempotent
|
||||
* * fail fast
|
||||
* * DRY
|
||||
*/
|
||||
export async function aTemplate(job: Job) {
|
||||
export default async function aTemplate(job: Job) {
|
||||
|
||||
assertPayload(job.data);
|
||||
const pb = await getPocketBaseClient();
|
||||
const vods = await getApplicableVods(pb);
|
||||
|
||||
job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`)
|
||||
job.log(`_Template found ${vods.length} vods in need of a streamDate.`)
|
||||
|
||||
for (let i = 0; i < vods.length; i++) {
|
||||
const vod = vods[i] as unknown as Vod;
|
||||
|
||||
44
services/worker/src/processors/cacheCleanup.ts
Normal file
44
services/worker/src/processors/cacheCleanup.ts
Normal file
@ -0,0 +1,44 @@
|
||||
import { Job } from "bullmq";
|
||||
import fs from "node:fs/promises";
|
||||
import path from "node:path";
|
||||
import env from "../../.config/env";
|
||||
|
||||
const retainmentDayCount = 7;
|
||||
|
||||
/**
|
||||
* cacheCleanup
|
||||
*
|
||||
* Deletes files in the cache directory that are older than retainmentDayCount days
|
||||
*/
|
||||
export default async function cacheCleanup(job: Job) {
|
||||
const cacheDir = env.CACHE_ROOT;
|
||||
let cleanedCount = 0;
|
||||
|
||||
try {
|
||||
// read all files in the cache directory
|
||||
const files = await fs.readdir(cacheDir);
|
||||
|
||||
const now = Date.now();
|
||||
const retainMs = retainmentDayCount * 24 * 60 * 60 * 1000; // days → ms
|
||||
|
||||
for (const file of files) {
|
||||
const filePath = path.join(cacheDir, file);
|
||||
try {
|
||||
const stat = await fs.stat(filePath);
|
||||
// only delete files older than retainment
|
||||
if (now - stat.mtimeMs > retainMs) {
|
||||
await fs.unlink(filePath);
|
||||
cleanedCount++;
|
||||
}
|
||||
} catch (err) {
|
||||
// skip errors per-file, but log them
|
||||
job.log(`failed to check/delete ${file}: ${(err as Error).message}`);
|
||||
}
|
||||
}
|
||||
|
||||
job.log(`cleaned ${cleanedCount} stale cache files`);
|
||||
} catch (err) {
|
||||
job.log(`cache cleanup failed: ${(err as Error).message}`);
|
||||
throw err; // allow BullMQ to handle retry/failure
|
||||
}
|
||||
}
|
||||
143
services/worker/src/processors/cacheGet.ts
Normal file
143
services/worker/src/processors/cacheGet.ts
Normal file
@ -0,0 +1,143 @@
|
||||
import { Job } from "bullmq";
|
||||
import { getPocketBaseClient } from "../util/pocketbase";
|
||||
import env from "../../.config/env";
|
||||
import { join, extname, dirname } from "node:path";
|
||||
import spawn from "nano-spawn";
|
||||
import { stat, mkdir, utimes } from "node:fs/promises";
|
||||
import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue";
|
||||
|
||||
interface Payload {
|
||||
vodId: string;
|
||||
}
|
||||
|
||||
interface FileInfo {
|
||||
sha1: string | 'none'; // annoying 'none' has to be dealt with
|
||||
size: number;
|
||||
contentType: string;
|
||||
fileId: string;
|
||||
}
|
||||
|
||||
const cacheRoot = env.CACHE_ROOT;
|
||||
|
||||
function assertPayload(payload: any): asserts payload is Payload {
|
||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
||||
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
||||
}
|
||||
|
||||
|
||||
async function fileExists(path: string) {
|
||||
try {
|
||||
return (await stat(path)).isFile();
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches metadata for a B2 object using the CLI.
|
||||
* Uses: `b2 file info b2://<key>`
|
||||
*
|
||||
* Returns:
|
||||
* - sha1 checksum
|
||||
* - file size
|
||||
* - content type
|
||||
* - file ID
|
||||
*
|
||||
* GOTCHA: Sometimes there is no checksum available via B2. This is because of how the files were uploaded (multipart.)
|
||||
*
|
||||
* Throws on any CLI error or missing fields.
|
||||
*/
|
||||
export async function getB2FileInfo(job: Job, s3Key: string): Promise<FileInfo> {
|
||||
const cmd = "b2";
|
||||
const args = ["file", "info", `b2://${env.AWS_BUCKET}/${s3Key}`];
|
||||
|
||||
let stdout: string;
|
||||
|
||||
|
||||
try {
|
||||
const result = await spawn(cmd, args);
|
||||
stdout = result.stdout;
|
||||
} catch (err: any) {
|
||||
throw new Error(`Failed to run 'b2 file info': ${err.stderr || err.message}`);
|
||||
}
|
||||
|
||||
let data: any;
|
||||
try {
|
||||
data = JSON.parse(stdout);
|
||||
|
||||
} catch (err) {
|
||||
throw new Error(`Failed to parse JSON from B2 CLI: ${stdout}`);
|
||||
}
|
||||
|
||||
if (!data.contentSha1 || !data.size) {
|
||||
throw new Error(`Unexpected B2 file info format for key ${s3Key}. contentSha1 or size was missing.`);
|
||||
}
|
||||
|
||||
|
||||
return {
|
||||
sha1: data.contentSha1,
|
||||
size: data.size,
|
||||
contentType: data.contentType || "application/octet-stream",
|
||||
fileId: data.fileId
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* getFileFromCache
|
||||
*
|
||||
* Get a sourceVideo file from cache. If not available in the cache, the file is downloaded and cached.
|
||||
*
|
||||
* Remember to make processors
|
||||
* * idempotent
|
||||
* * fail fast
|
||||
* * DRY
|
||||
*/
|
||||
export default async function cacheGet(job: Job) {
|
||||
|
||||
assertPayload(job.data);
|
||||
const pb = await getPocketBaseClient();
|
||||
const vodId = job.data.vodId;
|
||||
const vod = await pb.collection('vods').getOne(vodId);
|
||||
|
||||
const sourceVideo = vod.sourceVideo;
|
||||
if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`);
|
||||
|
||||
// 0. get filesize from B2. (They don't reliably offer a checksum so size is a compromise.)
|
||||
const info = await getB2FileInfo(job, vod.sourceVideo);
|
||||
|
||||
|
||||
// 1. Determine local path. Use the sha1 for the cachePath if available, otherwise use the filesize.
|
||||
const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, `${info.size}${extname(vod.sourceVideo)}`);
|
||||
|
||||
// 1.5 ensure cache dir
|
||||
await mkdir(dirname(cachePath), { recursive: true });
|
||||
|
||||
|
||||
// 2. check if cached
|
||||
if ((await fileExists(cachePath))) {
|
||||
job.log(`cache HIT. ${cachePath}`);
|
||||
return cachePath;
|
||||
}
|
||||
|
||||
// 3. queue deterministic download job
|
||||
job.log(`Cache MISS. Downloading vod ${vodId} to ${cachePath}...`);
|
||||
const downloadJob = await downloadQueue.add(
|
||||
'download',
|
||||
{ vodId, cachePath },
|
||||
{ jobId: `download-${vodId}` }
|
||||
);
|
||||
|
||||
|
||||
// 4. wait for download to finish
|
||||
await downloadJob.waitUntilFinished(downloadQueueEvents, 1000 * 60 * 60 * 3);
|
||||
|
||||
// 4.5 set access times (used for cache cleanup)
|
||||
await utimes(cachePath, Date.now(), Date.now());
|
||||
|
||||
job.log(`cacheGet complete with file downloaded to ${cachePath}`);
|
||||
|
||||
|
||||
}
|
||||
|
||||
95
services/worker/src/processors/download.ts
Normal file
95
services/worker/src/processors/download.ts
Normal file
@ -0,0 +1,95 @@
|
||||
|
||||
/**
|
||||
* download.ts
|
||||
*
|
||||
*
|
||||
* A task processor with the sole task of downloading a file from S3
|
||||
*
|
||||
* Generally, don't call this processor directly. Instead, use getFromCache processor, which will call this processor only if necessary.
|
||||
*
|
||||
* Other processors may depend on this one.
|
||||
*
|
||||
* This processor uses deterministic Job names so only one download of a specific file can occur at any given time. (locking)
|
||||
*
|
||||
* This processor saves files to a local cache so repeated calls to download the same file don't need to use any bandwidth.
|
||||
*
|
||||
* This processor is capable of downloading the following. Please add more if needed.
|
||||
* * vod.sourceVideo
|
||||
*
|
||||
* ### Deterministic Job name format
|
||||
* `download-${vod.id}-${vod.sourceVideo}`
|
||||
*
|
||||
*
|
||||
*/
|
||||
|
||||
|
||||
import { Job } from "bullmq";
|
||||
import { getPocketBaseClient } from "../util/pocketbase";
|
||||
import spawn from 'nano-spawn';
|
||||
import env from "../../.config/env";
|
||||
import { getB2FileInfo } from "./cacheGet";
|
||||
import { stat } from "fs/promises";
|
||||
|
||||
interface Payload {
|
||||
vodId: string;
|
||||
cachePath: string;
|
||||
}
|
||||
|
||||
function assertPayload(payload: any): asserts payload is Payload {
|
||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object.");
|
||||
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
|
||||
if (typeof payload.cachePath !== "string") throw new Error('invalid payload-- missing cachePath.');
|
||||
}
|
||||
|
||||
async function monitorProgress(cachePath: string, expectedSize: number, job: Job) {
|
||||
let stopped = false;
|
||||
|
||||
const tick = async () => {
|
||||
if (stopped) return;
|
||||
|
||||
try {
|
||||
const { size } = await stat(cachePath);
|
||||
const progress = Math.min((size / expectedSize) * 100, 100);
|
||||
await job.updateProgress(progress);
|
||||
} catch {
|
||||
// file might not exist yet
|
||||
}
|
||||
|
||||
// schedule next tick
|
||||
if (!stopped) setTimeout(tick, 5_000);
|
||||
};
|
||||
|
||||
// start the first tick
|
||||
tick();
|
||||
|
||||
// return a function to stop the monitor
|
||||
return () => {
|
||||
stopped = true;
|
||||
};
|
||||
}
|
||||
|
||||
export async function __download(job: Job, s3Key: string, cachePath: string) {
|
||||
if (!job) throw new Error('Job arg0 missing');
|
||||
if (!s3Key) throw new Error('s3Key arg1 missing');
|
||||
if (!cachePath) throw new Error('cachePath arg2 missing');
|
||||
job.log(`downloading ${s3Key} to ${cachePath}...`);
|
||||
|
||||
|
||||
const { size } = (await getB2FileInfo(job, s3Key));
|
||||
|
||||
const stopMonitor = await monitorProgress(cachePath, size, job);
|
||||
const { stdout } = await spawn('b2', ['file', 'download', `b2://${env.AWS_BUCKET}/${s3Key}`, cachePath])
|
||||
job.log(stdout);
|
||||
stopMonitor();
|
||||
|
||||
|
||||
job.log('Download complete.');
|
||||
}
|
||||
|
||||
|
||||
export default async function download(job: Job) {
|
||||
assertPayload(job.data);
|
||||
const pb = await getPocketBaseClient();
|
||||
const vod = await pb.collection('vods').getOne(job.data.vodId);
|
||||
await __download(job, vod.sourceVideo, job.data.cachePath);
|
||||
}
|
||||
16
services/worker/src/queues/cacheQueue.ts
Normal file
16
services/worker/src/queues/cacheQueue.ts
Normal file
@ -0,0 +1,16 @@
|
||||
import { Queue } from 'bullmq';
|
||||
import { connection } from '../../.config/bullmq.config';
|
||||
export const cacheQueue = new Queue('cacheQueue', { connection });
|
||||
|
||||
|
||||
await cacheQueue.upsertJobScheduler(
|
||||
'cache-cleanup',
|
||||
{
|
||||
every: 1000 * 60 * 60,
|
||||
},
|
||||
{
|
||||
name: 'cacheCleanup',
|
||||
data: {},
|
||||
opts: {},
|
||||
},
|
||||
);
|
||||
6
services/worker/src/queues/downloadQueue.ts
Normal file
6
services/worker/src/queues/downloadQueue.ts
Normal file
@ -0,0 +1,6 @@
|
||||
import { Queue, QueueEvents } from 'bullmq';
|
||||
import { connection } from '../../.config/bullmq.config';
|
||||
export const downloadQueue = new Queue('downloadQueue', { connection });
|
||||
export const downloadQueueEvents = new QueueEvents("download", {
|
||||
connection,
|
||||
});
|
||||
@ -243,7 +243,7 @@ export class QBittorrentClient {
|
||||
private async addTorrentCreationTask(sourcePath: string): Promise<string> {
|
||||
const torrentFilePath = join(tmpdir(), `${nanoid()}.torrent`);
|
||||
const url = `${this.baseUrl}/api/v2/torrentcreator/addTask`;
|
||||
console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, torrentFilePath=${torrentFilePath}, url=${url}`);
|
||||
console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, url=${url}`);
|
||||
|
||||
|
||||
console.log(`addTorrent using sourcePath=${sourcePath}`)
|
||||
@ -259,7 +259,7 @@ export class QBittorrentClient {
|
||||
sourcePath,
|
||||
isPrivate: "false",
|
||||
format: "hybrid",
|
||||
torrentFilePath: torrentFilePath,
|
||||
torrentFilePath,
|
||||
comment: "https://futureporn.net",
|
||||
source: "https://futureporn.net",
|
||||
// trackers: trackers.join('\n'), // this doesn't work. the two trackers appear on the same line in a single, invalid URL
|
||||
@ -384,11 +384,18 @@ export class QBittorrentClient {
|
||||
const torrentsRes = await fetch(`${this.baseUrl}/api/v2/torrents/info`, {
|
||||
headers: { Cookie: this.sidCookie! },
|
||||
});
|
||||
|
||||
if (!torrentsRes.ok) {
|
||||
console.error('__getTorrentInfos failed to fetch() torrent info.');
|
||||
const body = await torrentsRes.text();
|
||||
console.error(`${torrentsRes.status} ${torrentsRes.statusText} ${body}`);
|
||||
}
|
||||
|
||||
const torrents = await torrentsRes.json() as Array<{ hash: string; name: string }>;
|
||||
const torrent = torrents.find((t) => t.name === torrentName) as QBTorrentInfo;
|
||||
|
||||
if (!torrent) {
|
||||
throw new Error(`Torrent ${torrentName} not found in qBittorrent after adding`);
|
||||
throw new Error(`__getTorrentInfos failure. Torrent ${torrentName} not found in qBittorrent after adding`);
|
||||
}
|
||||
return torrent;
|
||||
}
|
||||
@ -415,7 +422,7 @@ export class QBittorrentClient {
|
||||
*/
|
||||
private async __addTorrent(localFilePath: string): Promise<void> {
|
||||
|
||||
console.log(`addTorrent using localFilePath=${localFilePath}`)
|
||||
console.log(`__addTorrent using localFilePath=${localFilePath}`)
|
||||
|
||||
if (!this.sidCookie) {
|
||||
throw new Error("Not connected. (SID cookie missing.)");
|
||||
@ -443,12 +450,16 @@ export class QBittorrentClient {
|
||||
|
||||
if (!res.ok) {
|
||||
const body = await res.text();
|
||||
throw new Error(`addTorrent failed: ${res.status} ${res.statusText} ${body}`);
|
||||
throw new Error(`__addTorrent failed: ${res.status} ${res.statusText} ${body}`);
|
||||
}
|
||||
|
||||
console.log('addTorrent success.');
|
||||
console.log('__addTorrent success.');
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @deprecated use __getTorrentInfos instead
|
||||
*/
|
||||
async getMagnetLink(fileName: string): Promise<string> {
|
||||
console.log(`getMagnetLink using fileName=${fileName}`)
|
||||
|
||||
@ -484,10 +495,11 @@ export class QBittorrentClient {
|
||||
// 4. add the torrent to qBittorrent
|
||||
// We *could* add the torrent in the torrentcreator,
|
||||
// but that usually errors right away,
|
||||
// so we add it here instead.
|
||||
// so we add it here instead. More robust this way.
|
||||
await this.__addTorrent(torrentFilePath);
|
||||
|
||||
// 5. Get magnet link
|
||||
console.log('lets get the torrent infos');
|
||||
const info = await this.__getTorrentInfos(basename(localFilePath))
|
||||
const magnetLink = info.magnet_uri;
|
||||
|
||||
|
||||
23
services/worker/src/util/sftp.spec.ts
Normal file
23
services/worker/src/util/sftp.spec.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { test, describe, expect } from 'vitest';
|
||||
import { join } from "node:path";
|
||||
import { sshClient } from './sftp.ts';
|
||||
|
||||
const fixturesDir = join(import.meta.dirname, '..', 'fixtures');
|
||||
const remoteUploadDir = "/upload"
|
||||
|
||||
describe('sftp integration', () => {
|
||||
|
||||
test("upload file", async () => {
|
||||
let filePath = join(fixturesDir, 'pizza.avif');
|
||||
|
||||
await expect(
|
||||
sshClient.uploadFile(filePath, remoteUploadDir)
|
||||
).resolves.toBeUndefined();
|
||||
});
|
||||
|
||||
test("uploadFile rejects on missing local file", async () => {
|
||||
await expect(
|
||||
sshClient.uploadFile("/does/not/exist.jpg", remoteUploadDir)
|
||||
).rejects.toThrow();
|
||||
});
|
||||
})
|
||||
@ -37,6 +37,12 @@ export class SSHClient {
|
||||
this.connected = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves or initializes the SFTP session.
|
||||
* @private
|
||||
* @returns {Promise<SFTPWrapper>} The SFTP session instance.
|
||||
* @throws {Error} If creating the SFTP session fails.
|
||||
*/
|
||||
private async getSFTP(): Promise<SFTPWrapper> {
|
||||
if (!this.sftp) {
|
||||
this.sftp = await new Promise((resolve, reject) => {
|
||||
@ -49,6 +55,12 @@ export class SSHClient {
|
||||
return this.sftp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a command on the remote SSH host.
|
||||
* @param {string} command - The command to run.
|
||||
* @returns {Promise<string>} The stdout output of the command.
|
||||
* @throws {Error} If the command exits with a non-zero code.
|
||||
*/
|
||||
async exec(command: string): Promise<string> {
|
||||
await this.connect();
|
||||
return new Promise<string>((resolve, reject) => {
|
||||
@ -67,6 +79,13 @@ export class SSHClient {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Uploads a local file to a directory on the remote server via SFTP.
|
||||
* @param {string} localFilePath - Path to the local file.
|
||||
* @param {string} remoteDir - Remote directory where the file will be stored.
|
||||
* @returns {Promise<void>}
|
||||
* @throws {Error} If the upload fails.
|
||||
*/
|
||||
async uploadFile(localFilePath: string, remoteDir: string): Promise<void> {
|
||||
console.log(`Uploading localFilePath=${localFilePath} to remoteDir=${remoteDir}...`);
|
||||
|
||||
@ -89,6 +108,13 @@ export class SSHClient {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Downloads a file from the remote server via SFTP.
|
||||
* @param {string} remoteFilePath - Path to the file on the remote server.
|
||||
* @param {string} localPath - Local destination path.
|
||||
* @returns {Promise<void>}
|
||||
* @throws {Error} If the download fails.
|
||||
*/
|
||||
async downloadFile(remoteFilePath: string, localPath: string): Promise<void> {
|
||||
console.log(`downloading remoteFilePath=${remoteFilePath} to localPath=${localPath}`)
|
||||
await this.connect();
|
||||
@ -99,20 +125,60 @@ export class SSHClient {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Ends the SSH session and resets state.
|
||||
* @returns {void}
|
||||
*/
|
||||
end(): void {
|
||||
this.client.end();
|
||||
this.connected = false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Downloads a file onto the remote host using `wget`.
|
||||
* @param {string} url - The URL to fetch from the internet.
|
||||
* @param {string} remoteDir - The directory on the remote host where the file will be saved.
|
||||
* @param {string[]} [extraArgs] - Additional arguments to pass to wget (e.g., ["--quiet"]).
|
||||
* @returns {Promise<string>} The full remote path of the downloaded file.
|
||||
* @throws {Error} If wget exits with a non-zero status.
|
||||
*/
|
||||
async downloadRemoteUrl(
|
||||
url: string,
|
||||
remoteDir: string,
|
||||
extraArgs: string[] = []
|
||||
): Promise<string> {
|
||||
await this.connect();
|
||||
|
||||
// Extract file name from URL.
|
||||
const fileName = url.split("/").pop() || "downloaded.file";
|
||||
const remoteFilePath = path.posix.join(remoteDir, fileName);
|
||||
|
||||
// Build wget command.
|
||||
const args = [
|
||||
`-O "${remoteFilePath}"`,
|
||||
...extraArgs.map((arg) => `"${arg}"`),
|
||||
`"${url}"`
|
||||
].join(" ");
|
||||
|
||||
const command = `wget ${args}`;
|
||||
|
||||
await this.exec(command);
|
||||
|
||||
return remoteFilePath;
|
||||
}
|
||||
|
||||
// --- usage helper ---
|
||||
const url = URL.parse(env.SEEDBOX_SFTP_URL);
|
||||
const hostname = url?.hostname;
|
||||
const port = url?.port;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Preconfigured SSHClient instance using environment-defined credentials.
|
||||
*/
|
||||
export const sshClient = new SSHClient({
|
||||
host: hostname!,
|
||||
port: port ? parseInt(port) : 22,
|
||||
host: env.SEEDBOX_SFTP_HOST,
|
||||
port: parseInt(env.SEEDBOX_SFTP_PORT),
|
||||
username: env.SEEDBOX_SFTP_USERNAME,
|
||||
password: env.SEEDBOX_SFTP_PASSWORD,
|
||||
});
|
||||
|
||||
26
services/worker/src/workers/cacheWorker.ts
Normal file
26
services/worker/src/workers/cacheWorker.ts
Normal file
@ -0,0 +1,26 @@
|
||||
import { Worker } from 'bullmq';
|
||||
import { connection } from '../../.config/bullmq.config.ts';
|
||||
import cacheGet from '../processors/cacheGet.ts';
|
||||
import cacheCleanup from '../processors/cacheCleanup.ts';
|
||||
|
||||
const workerName = 'cacheWorker';
|
||||
const queueName = 'cacheQueue';
|
||||
|
||||
new Worker(
|
||||
queueName,
|
||||
async (job) => {
|
||||
console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`);
|
||||
switch (job.name) {
|
||||
case 'cacheGet':
|
||||
return await cacheGet(job);
|
||||
case 'cacheCleanup':
|
||||
return await cacheCleanup(job);
|
||||
|
||||
default:
|
||||
throw new Error(`${workerName} Unknown job name: ${job.name}`);
|
||||
}
|
||||
},
|
||||
{ connection }
|
||||
);
|
||||
|
||||
console.log(`${workerName} is running...`);
|
||||
23
services/worker/src/workers/downloadWorker.ts
Normal file
23
services/worker/src/workers/downloadWorker.ts
Normal file
@ -0,0 +1,23 @@
|
||||
import { Worker } from 'bullmq';
|
||||
import { connection } from '../../.config/bullmq.config.ts';
|
||||
import download from '../processors/download.ts';
|
||||
|
||||
const workerName = 'downloadWorker';
|
||||
const queueName = 'downloadQueue';
|
||||
|
||||
new Worker(
|
||||
queueName,
|
||||
async (job) => {
|
||||
console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`);
|
||||
switch (job.name) {
|
||||
case 'download':
|
||||
return await download(job);
|
||||
|
||||
default:
|
||||
throw new Error(`${workerName} Unknown job name: ${job.name}`);
|
||||
}
|
||||
},
|
||||
{ connection }
|
||||
);
|
||||
|
||||
console.log(`${workerName} is running...`);
|
||||
@ -4,16 +4,24 @@ import { Worker } from 'bullmq';
|
||||
import { connection } from '../../.config/bullmq.config.ts';
|
||||
import { syncronizePatreon } from '../processors/syncronizePatreon.ts'
|
||||
import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts'
|
||||
import { createTorrent } from '../processors/createTorrent.ts';
|
||||
import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts';
|
||||
|
||||
new Worker(
|
||||
'highPriorityQueue',
|
||||
async (job) => {
|
||||
console.log('highPriorityWorker. we got a job on the highPriorityQueue.', job.data, job.name);
|
||||
console.log('highPriorityWorker. We got a job on the highPriorityQueue.', job.data, job.name);
|
||||
switch (job.name) {
|
||||
case '':
|
||||
throw new Error('missing job name.')
|
||||
case 'syncronizePatreon':
|
||||
return await syncronizePatreon(job);
|
||||
case 'getAnnounceUrlDetails':
|
||||
return await getAnnounceUrlDetails(job);
|
||||
case 'createTorrent':
|
||||
return await createTorrent(job);
|
||||
case 'copyV1VideoToV3':
|
||||
return await copyV1VideoToV3(job);
|
||||
default:
|
||||
throw new Error(`Unknown job name: ${job.name}`);
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user