diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 232f6be2..f1cdfbc6 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -18,6 +18,16 @@ "problemMatcher": [], "isBackground": true }, + { + "label": "Run sftp dev server", + "type": "shell", + "command": "docker run --name=futureporn-sftp-DEV --rm -it -v /home/cj/Documents/futureporn-monorepo/services/worker/src/fixtures/sftp:/home/fp -p 2222:22 atmoz/sftp fp:password:1000", + "problemMatcher": [], + "isBackground": true, + "runOptions": { + "runOn": "folderOpen" + } + }, { "label": "Run postgres", "type": "shell", diff --git a/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs b/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs index 3ef06272..60f96349 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs @@ -78,11 +78,16 @@ <% } %> <% if (data.vod?.get('magnetLink')) { %> -

Magnet Link: <%= data.vod?.get('magnetLink') %>

+

Magnet Link: + + + + +

<% } %> <% if (data.vod?.get('notes')) { %> -

Notes:

+

Notes:

<%- data.vod?.get('notes') %>
<% } %> diff --git a/services/pocketbase/pb_hooks/pages/vt/+middleware.js b/services/pocketbase/pb_hooks/pages/vt/+middleware.js new file mode 100644 index 00000000..b1ae25cb --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/+middleware.js @@ -0,0 +1,11 @@ + +/** + This directory is solely for redirecting futureporn v2 paths + + ex: /vt/el_xox/vod/20240317T214358Z redirects to /vods/kaovo483w4f4rc1 + */ + + +module.exports = function ({ redirect }, next) { + next() +} diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240121T034229Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240121T034229Z.ejs new file mode 100644 index 00000000..d645aecc --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240121T034229Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240202T232030Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240202T232030Z.ejs new file mode 100644 index 00000000..9506e95e --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240202T232030Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240301T233846Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240301T233846Z.ejs new file mode 100644 index 00000000..f49df06b --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240301T233846Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240310T030712Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240310T030712Z.ejs new file mode 100644 index 00000000..f7266116 --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240310T030712Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240317T214358Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240317T214358Z.ejs new file mode 100644 index 00000000..68de1c73 --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240317T214358Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240407T043427Z.ejs b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240407T043427Z.ejs new file mode 100644 index 00000000..b827431d --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/el_xox/vod/20240407T043427Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20220907T231300Z.ejs b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20220907T231300Z.ejs new file mode 100644 index 00000000..ac3dbd90 --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20220907T231300Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20231012T032454Z.ejs b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20231012T032454Z.ejs new file mode 100644 index 00000000..32ec93a1 --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20231012T032454Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20250111T021845Z.ejs b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20250111T021845Z.ejs new file mode 100644 index 00000000..4eb8688c --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vt/projektmelody/vod/20250111T021845Z.ejs @@ -0,0 +1,5 @@ + \ No newline at end of file diff --git a/services/worker/.config/env.ts b/services/worker/.config/env.ts index 14dad536..51bb9a30 100644 --- a/services/worker/.config/env.ts +++ b/services/worker/.config/env.ts @@ -28,14 +28,15 @@ const env = (() => { if (!process.env.APIFY_TOKEN) throw new Error('APIFY_TOKEN missing in env'); if (!process.env.NODE_ENV) throw new Error('APIFY_TOKEN missing in env'); if (!process.env.CACHE_ROOT) throw new Error('CACHE_ROOT missing in env'); - if (!process.env.SEEDBOX_SFTP_URL) throw new Error('SEEDBOX_SFTP_URL missing in env'); + if (!process.env.SEEDBOX_SFTP_HOST) throw new Error('SEEDBOX_SFTP_HOST missing in env'); + if (!process.env.SEEDBOX_SFTP_PORT) throw new Error('SEEDBOX_SFTP_PORT missing in env'); if (!process.env.SEEDBOX_SFTP_USERNAME) throw new Error('SEEDBOX_SFTP_USERNAME missing in env'); if (!process.env.SEEDBOX_SFTP_PASSWORD) throw new Error('SEEDBOX_SFTP_PASSWORD missing in env'); if (!process.env.QBT_HOST) throw new Error('QBT_HOST missing in env'); if (!process.env.QBT_PORT) throw new Error('QBT_PORT missing in env'); if (!process.env.QBT_PASSWORD) throw new Error('QBT_PASSWORD missing in env'); if (!process.env.QBT_USERNAME) throw new Error('QBT_USERNAME missing in env'); - + if (!process.env.WORKER_WORKERS) throw new Error('WORKER_WORKERS missing in env'); const { PORT, @@ -65,7 +66,8 @@ const env = (() => { APIFY_TOKEN, NODE_ENV, CACHE_ROOT, - SEEDBOX_SFTP_URL, + SEEDBOX_SFTP_HOST, + SEEDBOX_SFTP_PORT, SEEDBOX_SFTP_USERNAME, SEEDBOX_SFTP_PASSWORD, VALKEY_PORT, @@ -73,6 +75,7 @@ const env = (() => { QBT_USERNAME, QBT_PASSWORD, QBT_PORT, + WORKER_WORKERS, } = process.env return { PORT, @@ -102,7 +105,8 @@ const env = (() => { APIFY_TOKEN, NODE_ENV, CACHE_ROOT, - SEEDBOX_SFTP_URL, + SEEDBOX_SFTP_HOST, + SEEDBOX_SFTP_PORT, SEEDBOX_SFTP_USERNAME, SEEDBOX_SFTP_PASSWORD, VALKEY_PORT, @@ -110,6 +114,7 @@ const env = (() => { QBT_USERNAME, QBT_PASSWORD, QBT_PORT, + WORKER_WORKERS, } })() diff --git a/services/worker/.config/qBittorrent/config/categories.json b/services/worker/.config/qBittorrent/config/categories.json deleted file mode 100644 index 2c63c085..00000000 --- a/services/worker/.config/qBittorrent/config/categories.json +++ /dev/null @@ -1,2 +0,0 @@ -{ -} diff --git a/services/worker/.config/qBittorrent/config/rss/feeds.json b/services/worker/.config/qBittorrent/config/rss/feeds.json deleted file mode 100644 index 2c63c085..00000000 --- a/services/worker/.config/qBittorrent/config/rss/feeds.json +++ /dev/null @@ -1,2 +0,0 @@ -{ -} diff --git a/services/worker/.config/qBittorrent/config/watched_folders.json b/services/worker/.config/qBittorrent/config/watched_folders.json deleted file mode 100644 index 2c63c085..00000000 --- a/services/worker/.config/qBittorrent/config/watched_folders.json +++ /dev/null @@ -1,2 +0,0 @@ -{ -} diff --git a/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb b/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb deleted file mode 100644 index b817f052..00000000 Binary files a/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb and /dev/null differ diff --git a/services/worker/.config/qBittorrent/data/rss/articles/storage.lock b/services/worker/.config/qBittorrent/data/rss/articles/storage.lock deleted file mode 100644 index e69de29b..00000000 diff --git a/services/worker/.config/qBittorrent/config/lockfile b/services/worker/src/fixtures/sftp/data/.gitkeep similarity index 100% rename from services/worker/.config/qBittorrent/config/lockfile rename to services/worker/src/fixtures/sftp/data/.gitkeep diff --git a/services/worker/src/fixtures/sftp/data/2000-01-01-projektmelody-ccg0qhxwjs1i6s1.mp4 b/services/worker/src/fixtures/sftp/data/2000-01-01-projektmelody-ccg0qhxwjs1i6s1.mp4 new file mode 100644 index 00000000..c244dd6b Binary files /dev/null and b/services/worker/src/fixtures/sftp/data/2000-01-01-projektmelody-ccg0qhxwjs1i6s1.mp4 differ diff --git a/services/worker/.config/qBittorrent/config/rss/storage.lock b/services/worker/src/fixtures/sftp/watch/.gitkeep similarity index 100% rename from services/worker/.config/qBittorrent/config/rss/storage.lock rename to services/worker/src/fixtures/sftp/watch/.gitkeep diff --git a/services/worker/src/fixtures/sftp/watch/ecf6af55-d464-4e32-9a65-28365102fd6d.torrent b/services/worker/src/fixtures/sftp/watch/ecf6af55-d464-4e32-9a65-28365102fd6d.torrent new file mode 100644 index 00000000..dff32dbd Binary files /dev/null and b/services/worker/src/fixtures/sftp/watch/ecf6af55-d464-4e32-9a65-28365102fd6d.torrent differ diff --git a/services/worker/src/index.ts b/services/worker/src/index.ts index 83668aeb..c686f344 100644 --- a/services/worker/src/index.ts +++ b/services/worker/src/index.ts @@ -8,6 +8,8 @@ import { gpuQueue } from './queues/gpuQueue.ts'; import { highPriorityQueue } from './queues/highPriorityQueue.ts'; import env from '../.config/env.ts'; import { version } from '../package.json'; +import { downloadQueue } from './queues/downloadQueue.ts'; +import { cacheQueue } from './queues/cacheQueue.ts'; const run = async () => { @@ -24,22 +26,16 @@ const run = async () => { new BullMQAdapter(highPriorityQueue), new BullMQAdapter(generalQueue), new BullMQAdapter(gpuQueue), + new BullMQAdapter(downloadQueue), + new BullMQAdapter(cacheQueue), ], serverAdapter, }); - console.log('importing workers'); - if (process.env.NODE_ENV === 'development') { - await import('./workers/highPriorityWorker.ts'); - await import('./workers/generalWorker.ts'); - await import('./workers/gpuWorker.ts'); - } else { - // @todo I separated these so that they can be ran on multiple machines. - // @todo we should activate these based on environment variable feature flags, not NODE_ENV - await import('./workers/highPriorityWorker.ts'); - await import('./workers/generalWorker.ts'); - // await import('./workers/gpuWorker.ts'); // @todo implement - } + process.env.WORKER_WORKERS?.split(',').forEach(async (worker) => { + console.log(`👷 Importing ${worker}`); + await import(`./workers/${worker}.ts`); + }) app.get('/', (req: Request, res: Response) => { res.send(` @@ -74,8 +70,10 @@ const run = async () => {
  • Bull Dashboard
  • Task: presignMuxAssets
  • Task: copyV1VideoAll
  • +
  • Task: copyV1VideoToV3
  • Task: createTorrent
  • Task: createMuxAsset
  • +
  • Task: cacheGet
  • `) }) @@ -86,7 +84,7 @@ const run = async () => { const name = req.query.name as string; const vodId = req.query.vodId as string; // console.log('vodId', vodId, 'name', name); - // console.log(JSON.stringify(req.query, null, 2)) + // console.log(JSON.stringify(req.query, null, 2)); const data = { vodId }; @@ -96,6 +94,12 @@ const run = async () => { case 'scheduleVodProcessing': await gpuQueue.add(name, data); break; + case 'cacheGet': + await cacheQueue.add(name, data); + break; + case 'download': + await downloadQueue.add(name, data); + break; default: await highPriorityQueue.add(name, data); break; diff --git a/services/worker/src/processors/_Template.ts b/services/worker/src/processors/_Template.ts index b318e5c8..29f5601b 100644 --- a/services/worker/src/processors/_Template.ts +++ b/services/worker/src/processors/_Template.ts @@ -3,9 +3,17 @@ import { getPocketBaseClient } from "../util/pocketbase"; import Client from "pocketbase"; import { Vod } from "../types"; +interface Payload { + vodId: string; +} const foo = 'bar'; +function assertPayload(payload: any): asserts payload is Payload { + if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object."); + if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId"); +} + /** * barFunction * @@ -36,17 +44,18 @@ async function getApplicableVods(pb: Client) { * * A template to copy for when we make new processors. Shows the general scaffolding. * - * Remember to makes processors + * Remember to make processors * * idempotent * * fail fast * * DRY */ -export async function aTemplate(job: Job) { +export default async function aTemplate(job: Job) { + assertPayload(job.data); const pb = await getPocketBaseClient(); const vods = await getApplicableVods(pb); - job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`) + job.log(`_Template found ${vods.length} vods in need of a streamDate.`) for (let i = 0; i < vods.length; i++) { const vod = vods[i] as unknown as Vod; diff --git a/services/worker/src/processors/cacheCleanup.ts b/services/worker/src/processors/cacheCleanup.ts new file mode 100644 index 00000000..e7d51006 --- /dev/null +++ b/services/worker/src/processors/cacheCleanup.ts @@ -0,0 +1,44 @@ +import { Job } from "bullmq"; +import fs from "node:fs/promises"; +import path from "node:path"; +import env from "../../.config/env"; + +const retainmentDayCount = 7; + +/** + * cacheCleanup + * + * Deletes files in the cache directory that are older than retainmentDayCount days + */ +export default async function cacheCleanup(job: Job) { + const cacheDir = env.CACHE_ROOT; + let cleanedCount = 0; + + try { + // read all files in the cache directory + const files = await fs.readdir(cacheDir); + + const now = Date.now(); + const retainMs = retainmentDayCount * 24 * 60 * 60 * 1000; // days → ms + + for (const file of files) { + const filePath = path.join(cacheDir, file); + try { + const stat = await fs.stat(filePath); + // only delete files older than retainment + if (now - stat.mtimeMs > retainMs) { + await fs.unlink(filePath); + cleanedCount++; + } + } catch (err) { + // skip errors per-file, but log them + job.log(`failed to check/delete ${file}: ${(err as Error).message}`); + } + } + + job.log(`cleaned ${cleanedCount} stale cache files`); + } catch (err) { + job.log(`cache cleanup failed: ${(err as Error).message}`); + throw err; // allow BullMQ to handle retry/failure + } +} diff --git a/services/worker/src/processors/cacheGet.ts b/services/worker/src/processors/cacheGet.ts new file mode 100644 index 00000000..d425177f --- /dev/null +++ b/services/worker/src/processors/cacheGet.ts @@ -0,0 +1,143 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import env from "../../.config/env"; +import { join, extname, dirname } from "node:path"; +import spawn from "nano-spawn"; +import { stat, mkdir, utimes } from "node:fs/promises"; +import { downloadQueue, downloadQueueEvents } from "../queues/downloadQueue"; + +interface Payload { + vodId: string; +} + +interface FileInfo { + sha1: string | 'none'; // annoying 'none' has to be dealt with + size: number; + contentType: string; + fileId: string; +} + +const cacheRoot = env.CACHE_ROOT; + +function assertPayload(payload: any): asserts payload is Payload { + if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object."); + if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId"); +} + + +async function fileExists(path: string) { + try { + return (await stat(path)).isFile(); + } catch { + return false; + } +} + +/** + * Fetches metadata for a B2 object using the CLI. + * Uses: `b2 file info b2://` + * + * Returns: + * - sha1 checksum + * - file size + * - content type + * - file ID + * + * GOTCHA: Sometimes there is no checksum available via B2. This is because of how the files were uploaded (multipart.) + * + * Throws on any CLI error or missing fields. + */ +export async function getB2FileInfo(job: Job, s3Key: string): Promise { + const cmd = "b2"; + const args = ["file", "info", `b2://${env.AWS_BUCKET}/${s3Key}`]; + + let stdout: string; + + + try { + const result = await spawn(cmd, args); + stdout = result.stdout; + } catch (err: any) { + throw new Error(`Failed to run 'b2 file info': ${err.stderr || err.message}`); + } + + let data: any; + try { + data = JSON.parse(stdout); + + } catch (err) { + throw new Error(`Failed to parse JSON from B2 CLI: ${stdout}`); + } + + if (!data.contentSha1 || !data.size) { + throw new Error(`Unexpected B2 file info format for key ${s3Key}. contentSha1 or size was missing.`); + } + + + return { + sha1: data.contentSha1, + size: data.size, + contentType: data.contentType || "application/octet-stream", + fileId: data.fileId + }; +} + + +/** + * + * getFileFromCache + * + * Get a sourceVideo file from cache. If not available in the cache, the file is downloaded and cached. + * + * Remember to make processors + * * idempotent + * * fail fast + * * DRY + */ +export default async function cacheGet(job: Job) { + + assertPayload(job.data); + const pb = await getPocketBaseClient(); + const vodId = job.data.vodId; + const vod = await pb.collection('vods').getOne(vodId); + + const sourceVideo = vod.sourceVideo; + if (!sourceVideo) throw new Error(`vod ${vodId} is missing a sourceVideo.`); + + // 0. get filesize from B2. (They don't reliably offer a checksum so size is a compromise.) + const info = await getB2FileInfo(job, vod.sourceVideo); + + + // 1. Determine local path. Use the sha1 for the cachePath if available, otherwise use the filesize. + const cachePath = join(cacheRoot, 'vods', vodId, `sourceVideo`, `${info.size}${extname(vod.sourceVideo)}`); + + // 1.5 ensure cache dir + await mkdir(dirname(cachePath), { recursive: true }); + + + // 2. check if cached + if ((await fileExists(cachePath))) { + job.log(`cache HIT. ${cachePath}`); + return cachePath; + } + + // 3. queue deterministic download job + job.log(`Cache MISS. Downloading vod ${vodId} to ${cachePath}...`); + const downloadJob = await downloadQueue.add( + 'download', + { vodId, cachePath }, + { jobId: `download-${vodId}` } + ); + + + // 4. wait for download to finish + await downloadJob.waitUntilFinished(downloadQueueEvents, 1000 * 60 * 60 * 3); + + // 4.5 set access times (used for cache cleanup) + await utimes(cachePath, Date.now(), Date.now()); + + job.log(`cacheGet complete with file downloaded to ${cachePath}`); + + +} + diff --git a/services/worker/src/processors/download.ts b/services/worker/src/processors/download.ts new file mode 100644 index 00000000..e1264ce0 --- /dev/null +++ b/services/worker/src/processors/download.ts @@ -0,0 +1,95 @@ + +/** + * download.ts + * + * + * A task processor with the sole task of downloading a file from S3 + * + * Generally, don't call this processor directly. Instead, use getFromCache processor, which will call this processor only if necessary. + * + * Other processors may depend on this one. + * + * This processor uses deterministic Job names so only one download of a specific file can occur at any given time. (locking) + * + * This processor saves files to a local cache so repeated calls to download the same file don't need to use any bandwidth. + * + * This processor is capable of downloading the following. Please add more if needed. + * * vod.sourceVideo + * + * ### Deterministic Job name format + * `download-${vod.id}-${vod.sourceVideo}` + * + * + */ + + +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import spawn from 'nano-spawn'; +import env from "../../.config/env"; +import { getB2FileInfo } from "./cacheGet"; +import { stat } from "fs/promises"; + +interface Payload { + vodId: string; + cachePath: string; +} + +function assertPayload(payload: any): asserts payload is Payload { + if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object."); + if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId"); + if (typeof payload.cachePath !== "string") throw new Error('invalid payload-- missing cachePath.'); +} + +async function monitorProgress(cachePath: string, expectedSize: number, job: Job) { + let stopped = false; + + const tick = async () => { + if (stopped) return; + + try { + const { size } = await stat(cachePath); + const progress = Math.min((size / expectedSize) * 100, 100); + await job.updateProgress(progress); + } catch { + // file might not exist yet + } + + // schedule next tick + if (!stopped) setTimeout(tick, 5_000); + }; + + // start the first tick + tick(); + + // return a function to stop the monitor + return () => { + stopped = true; + }; +} + +export async function __download(job: Job, s3Key: string, cachePath: string) { + if (!job) throw new Error('Job arg0 missing'); + if (!s3Key) throw new Error('s3Key arg1 missing'); + if (!cachePath) throw new Error('cachePath arg2 missing'); + job.log(`downloading ${s3Key} to ${cachePath}...`); + + + const { size } = (await getB2FileInfo(job, s3Key)); + + const stopMonitor = await monitorProgress(cachePath, size, job); + const { stdout } = await spawn('b2', ['file', 'download', `b2://${env.AWS_BUCKET}/${s3Key}`, cachePath]) + job.log(stdout); + stopMonitor(); + + + job.log('Download complete.'); +} + + +export default async function download(job: Job) { + assertPayload(job.data); + const pb = await getPocketBaseClient(); + const vod = await pb.collection('vods').getOne(job.data.vodId); + await __download(job, vod.sourceVideo, job.data.cachePath); +} \ No newline at end of file diff --git a/services/worker/src/queues/cacheQueue.ts b/services/worker/src/queues/cacheQueue.ts new file mode 100644 index 00000000..0ff961fe --- /dev/null +++ b/services/worker/src/queues/cacheQueue.ts @@ -0,0 +1,16 @@ +import { Queue } from 'bullmq'; +import { connection } from '../../.config/bullmq.config'; +export const cacheQueue = new Queue('cacheQueue', { connection }); + + +await cacheQueue.upsertJobScheduler( + 'cache-cleanup', + { + every: 1000 * 60 * 60, + }, + { + name: 'cacheCleanup', + data: {}, + opts: {}, + }, +); \ No newline at end of file diff --git a/services/worker/src/queues/downloadQueue.ts b/services/worker/src/queues/downloadQueue.ts new file mode 100644 index 00000000..ba13aaf5 --- /dev/null +++ b/services/worker/src/queues/downloadQueue.ts @@ -0,0 +1,6 @@ +import { Queue, QueueEvents } from 'bullmq'; +import { connection } from '../../.config/bullmq.config'; +export const downloadQueue = new Queue('downloadQueue', { connection }); +export const downloadQueueEvents = new QueueEvents("download", { + connection, +}); \ No newline at end of file diff --git a/services/worker/src/util/qbittorrent.ts b/services/worker/src/util/qbittorrent.ts index cdb127c3..855492d3 100644 --- a/services/worker/src/util/qbittorrent.ts +++ b/services/worker/src/util/qbittorrent.ts @@ -243,7 +243,7 @@ export class QBittorrentClient { private async addTorrentCreationTask(sourcePath: string): Promise { const torrentFilePath = join(tmpdir(), `${nanoid()}.torrent`); const url = `${this.baseUrl}/api/v2/torrentcreator/addTask`; - console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, torrentFilePath=${torrentFilePath}, url=${url}`); + console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, url=${url}`); console.log(`addTorrent using sourcePath=${sourcePath}`) @@ -259,7 +259,7 @@ export class QBittorrentClient { sourcePath, isPrivate: "false", format: "hybrid", - torrentFilePath: torrentFilePath, + torrentFilePath, comment: "https://futureporn.net", source: "https://futureporn.net", // trackers: trackers.join('\n'), // this doesn't work. the two trackers appear on the same line in a single, invalid URL @@ -384,11 +384,18 @@ export class QBittorrentClient { const torrentsRes = await fetch(`${this.baseUrl}/api/v2/torrents/info`, { headers: { Cookie: this.sidCookie! }, }); + + if (!torrentsRes.ok) { + console.error('__getTorrentInfos failed to fetch() torrent info.'); + const body = await torrentsRes.text(); + console.error(`${torrentsRes.status} ${torrentsRes.statusText} ${body}`); + } + const torrents = await torrentsRes.json() as Array<{ hash: string; name: string }>; const torrent = torrents.find((t) => t.name === torrentName) as QBTorrentInfo; if (!torrent) { - throw new Error(`Torrent ${torrentName} not found in qBittorrent after adding`); + throw new Error(`__getTorrentInfos failure. Torrent ${torrentName} not found in qBittorrent after adding`); } return torrent; } @@ -415,7 +422,7 @@ export class QBittorrentClient { */ private async __addTorrent(localFilePath: string): Promise { - console.log(`addTorrent using localFilePath=${localFilePath}`) + console.log(`__addTorrent using localFilePath=${localFilePath}`) if (!this.sidCookie) { throw new Error("Not connected. (SID cookie missing.)"); @@ -443,12 +450,16 @@ export class QBittorrentClient { if (!res.ok) { const body = await res.text(); - throw new Error(`addTorrent failed: ${res.status} ${res.statusText} ${body}`); + throw new Error(`__addTorrent failed: ${res.status} ${res.statusText} ${body}`); } - console.log('addTorrent success.'); + console.log('__addTorrent success.'); } + /** + * + * @deprecated use __getTorrentInfos instead + */ async getMagnetLink(fileName: string): Promise { console.log(`getMagnetLink using fileName=${fileName}`) @@ -484,10 +495,11 @@ export class QBittorrentClient { // 4. add the torrent to qBittorrent // We *could* add the torrent in the torrentcreator, // but that usually errors right away, - // so we add it here instead. + // so we add it here instead. More robust this way. await this.__addTorrent(torrentFilePath); // 5. Get magnet link + console.log('lets get the torrent infos'); const info = await this.__getTorrentInfos(basename(localFilePath)) const magnetLink = info.magnet_uri; diff --git a/services/worker/src/util/sftp.spec.ts b/services/worker/src/util/sftp.spec.ts new file mode 100644 index 00000000..cead8c26 --- /dev/null +++ b/services/worker/src/util/sftp.spec.ts @@ -0,0 +1,23 @@ +import { test, describe, expect } from 'vitest'; +import { join } from "node:path"; +import { sshClient } from './sftp.ts'; + +const fixturesDir = join(import.meta.dirname, '..', 'fixtures'); +const remoteUploadDir = "/upload" + +describe('sftp integration', () => { + + test("upload file", async () => { + let filePath = join(fixturesDir, 'pizza.avif'); + + await expect( + sshClient.uploadFile(filePath, remoteUploadDir) + ).resolves.toBeUndefined(); + }); + + test("uploadFile rejects on missing local file", async () => { + await expect( + sshClient.uploadFile("/does/not/exist.jpg", remoteUploadDir) + ).rejects.toThrow(); + }); +}) \ No newline at end of file diff --git a/services/worker/src/util/sftp.ts b/services/worker/src/util/sftp.ts index bae1ecae..2a58a9bd 100644 --- a/services/worker/src/util/sftp.ts +++ b/services/worker/src/util/sftp.ts @@ -37,6 +37,12 @@ export class SSHClient { this.connected = true; } + /** + * Retrieves or initializes the SFTP session. + * @private + * @returns {Promise} The SFTP session instance. + * @throws {Error} If creating the SFTP session fails. + */ private async getSFTP(): Promise { if (!this.sftp) { this.sftp = await new Promise((resolve, reject) => { @@ -49,6 +55,12 @@ export class SSHClient { return this.sftp; } + /** + * Executes a command on the remote SSH host. + * @param {string} command - The command to run. + * @returns {Promise} The stdout output of the command. + * @throws {Error} If the command exits with a non-zero code. + */ async exec(command: string): Promise { await this.connect(); return new Promise((resolve, reject) => { @@ -67,6 +79,13 @@ export class SSHClient { }); } + /** + * Uploads a local file to a directory on the remote server via SFTP. + * @param {string} localFilePath - Path to the local file. + * @param {string} remoteDir - Remote directory where the file will be stored. + * @returns {Promise} + * @throws {Error} If the upload fails. + */ async uploadFile(localFilePath: string, remoteDir: string): Promise { console.log(`Uploading localFilePath=${localFilePath} to remoteDir=${remoteDir}...`); @@ -89,6 +108,13 @@ export class SSHClient { }); } + /** + * Downloads a file from the remote server via SFTP. + * @param {string} remoteFilePath - Path to the file on the remote server. + * @param {string} localPath - Local destination path. + * @returns {Promise} + * @throws {Error} If the download fails. + */ async downloadFile(remoteFilePath: string, localPath: string): Promise { console.log(`downloading remoteFilePath=${remoteFilePath} to localPath=${localPath}`) await this.connect(); @@ -99,20 +125,60 @@ export class SSHClient { }); } + /** + * Ends the SSH session and resets state. + * @returns {void} + */ end(): void { this.client.end(); this.connected = false; } + + + /** + * Downloads a file onto the remote host using `wget`. + * @param {string} url - The URL to fetch from the internet. + * @param {string} remoteDir - The directory on the remote host where the file will be saved. + * @param {string[]} [extraArgs] - Additional arguments to pass to wget (e.g., ["--quiet"]). + * @returns {Promise} The full remote path of the downloaded file. + * @throws {Error} If wget exits with a non-zero status. + */ + async downloadRemoteUrl( + url: string, + remoteDir: string, + extraArgs: string[] = [] + ): Promise { + await this.connect(); + + // Extract file name from URL. + const fileName = url.split("/").pop() || "downloaded.file"; + const remoteFilePath = path.posix.join(remoteDir, fileName); + + // Build wget command. + const args = [ + `-O "${remoteFilePath}"`, + ...extraArgs.map((arg) => `"${arg}"`), + `"${url}"` + ].join(" "); + + const command = `wget ${args}`; + + await this.exec(command); + + return remoteFilePath; + } + } -// --- usage helper --- -const url = URL.parse(env.SEEDBOX_SFTP_URL); -const hostname = url?.hostname; -const port = url?.port; + + +/** + * Preconfigured SSHClient instance using environment-defined credentials. + */ export const sshClient = new SSHClient({ - host: hostname!, - port: port ? parseInt(port) : 22, + host: env.SEEDBOX_SFTP_HOST, + port: parseInt(env.SEEDBOX_SFTP_PORT), username: env.SEEDBOX_SFTP_USERNAME, password: env.SEEDBOX_SFTP_PASSWORD, }); diff --git a/services/worker/src/workers/cacheWorker.ts b/services/worker/src/workers/cacheWorker.ts new file mode 100644 index 00000000..fda57042 --- /dev/null +++ b/services/worker/src/workers/cacheWorker.ts @@ -0,0 +1,26 @@ +import { Worker } from 'bullmq'; +import { connection } from '../../.config/bullmq.config.ts'; +import cacheGet from '../processors/cacheGet.ts'; +import cacheCleanup from '../processors/cacheCleanup.ts'; + +const workerName = 'cacheWorker'; +const queueName = 'cacheQueue'; + +new Worker( + queueName, + async (job) => { + console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`); + switch (job.name) { + case 'cacheGet': + return await cacheGet(job); + case 'cacheCleanup': + return await cacheCleanup(job); + + default: + throw new Error(`${workerName} Unknown job name: ${job.name}`); + } + }, + { connection } +); + +console.log(`${workerName} is running...`); diff --git a/services/worker/src/workers/downloadWorker.ts b/services/worker/src/workers/downloadWorker.ts new file mode 100644 index 00000000..16dc2774 --- /dev/null +++ b/services/worker/src/workers/downloadWorker.ts @@ -0,0 +1,23 @@ +import { Worker } from 'bullmq'; +import { connection } from '../../.config/bullmq.config.ts'; +import download from '../processors/download.ts'; + +const workerName = 'downloadWorker'; +const queueName = 'downloadQueue'; + +new Worker( + queueName, + async (job) => { + console.log(`${workerName}. we got a job on the ${queueName}. data=${JSON.stringify(job.data)}, job name=${job.name}`); + switch (job.name) { + case 'download': + return await download(job); + + default: + throw new Error(`${workerName} Unknown job name: ${job.name}`); + } + }, + { connection } +); + +console.log(`${workerName} is running...`); diff --git a/services/worker/src/workers/highPriorityWorker.ts b/services/worker/src/workers/highPriorityWorker.ts index 4263d65c..b64c8fb1 100644 --- a/services/worker/src/workers/highPriorityWorker.ts +++ b/services/worker/src/workers/highPriorityWorker.ts @@ -4,16 +4,24 @@ import { Worker } from 'bullmq'; import { connection } from '../../.config/bullmq.config.ts'; import { syncronizePatreon } from '../processors/syncronizePatreon.ts' import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts' +import { createTorrent } from '../processors/createTorrent.ts'; +import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts'; new Worker( 'highPriorityQueue', async (job) => { - console.log('highPriorityWorker. we got a job on the highPriorityQueue.', job.data, job.name); + console.log('highPriorityWorker. We got a job on the highPriorityQueue.', job.data, job.name); switch (job.name) { + case '': + throw new Error('missing job name.') case 'syncronizePatreon': return await syncronizePatreon(job); case 'getAnnounceUrlDetails': return await getAnnounceUrlDetails(job); + case 'createTorrent': + return await createTorrent(job); + case 'copyV1VideoToV3': + return await copyV1VideoToV3(job); default: throw new Error(`Unknown job name: ${job.name}`); }