diff --git a/.vscode/tasks.json b/.vscode/tasks.json index f9b5d352..232f6be2 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -30,7 +30,7 @@ "type": "shell", "command": "docker run -it -p 5050:5050 --rm --name futureporn-pgadmin -e PGADMIN_LISTEN_PORT=5050 -e PGADMIN_DISABLE_POSTFIX=1 -e PGADMIN_DEFAULT_EMAIL=cj@futureporn.net -e PGADMIN_DEFAULT_PASSWORD=password dpage/pgadmin4", "problemMatcher": [], - "isBackground": true, + "isBackground": true }, { "label": "Run Docker Compose", @@ -106,10 +106,23 @@ "runOn": "folderOpen" } }, + { + "label": "Run qBittorrent", + "type": "shell", + "command": "qbittorrent-nox --confirm-legal-notice --webui-port=8069 --profile=/home/cj/.config --configuration=futureporn", + "isBackground": true, + "options": { + "cwd": "${workspaceFolder}/services/worker" + }, + "runOptions": { + "runOn": "folderOpen" + }, + "problemMatcher": [] + }, { "label": "Run valkey", "type": "shell", - "command": "docker run --name futureporn-valkey --rm -p 6379:6379 valkey/valkey", + "command": "docker run --name futureporn-valkey-DEVELOPMENT --rm -p 6667:6379 valkey/valkey", "isBackground": true, "problemMatcher": [], "options": { @@ -124,7 +137,7 @@ "type": "shell", "command": "curl http://localhost:3000/task?title=fmv", "isBackground": false, - "problemMatcher": [], + "problemMatcher": [] } ] } \ No newline at end of file diff --git a/services/pocketbase/package.json b/services/pocketbase/package.json index 48f156ea..ad45e587 100644 --- a/services/pocketbase/package.json +++ b/services/pocketbase/package.json @@ -1,6 +1,6 @@ { "name": "futureporn", - "version": "3.4.5", + "version": "3.5.0", "private": true, "description": "Dedication to the preservation of lewdtuber history", "license": "Unlicense", diff --git a/services/pocketbase/pb_hooks/pages/(site)/+layout.ejs b/services/pocketbase/pb_hooks/pages/(site)/+layout.ejs index b68601bd..277c71bd 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/+layout.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/+layout.ejs @@ -42,11 +42,11 @@ Account
- Logout + Logout
<% } else { %>
- Login + Login
<% } %> @@ -76,7 +76,20 @@
-
Hello World
+ <% if (!auth) { %> +
+

Hi thanks for visiting! 🤠

+

We are receiving a lot of traffic and are forced to restrict playback to logged in visitors.

+

To watch a vod, please log in.

+
+ <% } %> +
+

Torrent downloads are coming soon.

+
+
+

Futureporn is no longer using IPFS. Read more

+
+
diff --git a/services/pocketbase/pb_hooks/pages/(site)/auth/_private/oauth2.ejs b/services/pocketbase/pb_hooks/pages/(site)/auth/_private/oauth2.ejs index 45fa4e5d..0192c10b 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/auth/_private/oauth2.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/auth/_private/oauth2.ejs @@ -1,8 +1,18 @@ \ No newline at end of file diff --git a/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/confirm.ejs b/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/confirm.ejs index 9e3c2c6e..c239daa5 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/confirm.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/confirm.ejs @@ -6,11 +6,13 @@ let error = null let authData = null + let returnTo = request.cookies("returnTo") || '/account'; + try { authData = signInWithOAuth2(state, code) - dbg("the shit user id is " + authData.record.id) + if (!authData.record.id) { throw new Error('invalid authData. authData must contain a record.id'); } @@ -55,7 +57,8 @@ // The backend later removes the status if they aren't subscribed // let user = $app.findRecordById('user', ) - response.redirect('/') + dbg(`redirecting to ${returnTo}. cookies=${JSON.stringify(request.cookies(), null, 2)}`); + response.redirect(returnTo); } catch (e) { error = e.message } diff --git a/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/login.ejs b/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/login.ejs index baf7145c..4282b388 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/login.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/auth/oauth/login.ejs @@ -6,8 +6,10 @@ try { provider = body().provider const url = requestOAuth2Login(provider) - // console.log('url is as follows') - // console.log(url) + + const returnTo = params?.returnTo || '/'; + + response.redirect(url) } catch (error) { console.error("FAILURE! We failed to get the OAuth2Login from provider", provider) diff --git a/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs b/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs index 58a39e55..3ef06272 100644 --- a/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs +++ b/services/pocketbase/pb_hooks/pages/(site)/vods/[id]/index.ejs @@ -1,3 +1,21 @@ +<% if (!data?.user) { %> +
+ + + + + + + + + + For video playback options, please login + + + +
+<% } else { %> +
data-signals="{'selected':'cdn1'}" <% } else { %> data-signals="{'selected':'cdn2'}" <% } %>> @@ -20,7 +38,6 @@ <% } %>
- +<% } %>
diff --git a/services/pocketbase/pb_hooks/pages/vods/+middleware.js b/services/pocketbase/pb_hooks/pages/vods/+middleware.js new file mode 100644 index 00000000..97f4427e --- /dev/null +++ b/services/pocketbase/pb_hooks/pages/vods/+middleware.js @@ -0,0 +1,25 @@ +// +middleware.js +// load vods for the vod feed + +/** @type {import('pocketpages').PageDataLoaderFunc} */ +module.exports = function ({ params, response }, next) { + try { + const vods = $app.findRecordsByFilter('vods', null, '-streamDate', 25); + $app.expandRecords(vods, ["vtubers"], null); + next({ vods }); + + } catch (e) { + console.error('error!', e.message); + + if (e.message.match(/no rows/)) { + console.error('we are sending 404') + return response.html(404, 'VODs not found') + + } else { + console.error('we are sending error 500') + return response.html(500, 'Unknown internal error while fetching vods') + + } + } +}; + diff --git a/services/pocketbase/pb_public/apple-touch-icon-precomposed.png b/services/pocketbase/pb_public/apple-touch-icon-precomposed.png new file mode 100644 index 00000000..315b6eb1 Binary files /dev/null and b/services/pocketbase/pb_public/apple-touch-icon-precomposed.png differ diff --git a/services/pocketbase/utils/data_migrations/2025-11-16-rm-fake-sourceVideo.ts b/services/pocketbase/utils/data_migrations/2025-11-16-rm-fake-sourceVideo.ts new file mode 100644 index 00000000..bbaabe79 --- /dev/null +++ b/services/pocketbase/utils/data_migrations/2025-11-16-rm-fake-sourceVideo.ts @@ -0,0 +1,55 @@ +// 2025-11-16-rm-fake-sourceVideo.ts + +import PocketBase from 'pocketbase'; +import { readFileSync } from 'node:fs'; +import { basename, join } from 'node:path'; +import spawn from 'nano-spawn'; +import { tmpdir } from 'node:os'; +import mime from 'mime'; + +const pb = new PocketBase(process.env.POCKETBASE_URL || 'http://127.0.0.1:8090'); +if (!process.env.POCKETBASE_USERNAME) throw new Error('POCKETBASE_USERNAME missing'); +if (!process.env.POCKETBASE_PASSWORD) throw new Error('POCKETBASE_PASSWORD missing'); + +interface ManifestItem { + thumbnail_url: string; + thumbnail_key: string; + tmp_file: string; + vod_id: string; +} + +type Manifest = ManifestItem[]; + +interface Vod { + id: string; + thumbnail: string; + streamDate: string; +} + +const vodsCollectionName = 'pbc_144770472'; +// pbc_144770472 is the vods collection +// cv6m31vj98gmtsx is a sample vod id + + + +async function main() { + console.log('Authenticating with PocketBase...'); + await pb + .collection("_superusers") + .authWithPassword(process.env.POCKETBASE_USERNAME!, process.env.POCKETBASE_PASSWORD!); + + const vods = await pb.collection('vods').getFullList({ filter: 'sourceVideo ~ "content/"' }); + console.log(`${vods.length} vods.`); + + for (const [i, vod] of vods.entries()) { + console.log("processing vod " + i + "/" + vods.length + " " + vod.id); + await pb.collection('vods').update(vod.id, { sourceVideo: '' }); + } + + console.log("All done."); + + + +} + +main() \ No newline at end of file diff --git a/services/worker/.config/.gitignore b/services/worker/.config/.gitignore new file mode 100644 index 00000000..1be31049 --- /dev/null +++ b/services/worker/.config/.gitignore @@ -0,0 +1 @@ +qBittorrent.conf \ No newline at end of file diff --git a/services/worker/.config/bullmq.config.ts b/services/worker/.config/bullmq.config.ts index 9bceb4b9..0e1864c5 100644 --- a/services/worker/.config/bullmq.config.ts +++ b/services/worker/.config/bullmq.config.ts @@ -1,8 +1,9 @@ import { Queue as QueueMQ, Worker, type QueueOptions, type JobsOptions, type Job } from 'bullmq'; +import env from './env'; - +console.log(`using VALKEY_PORT=${env.VALKEY_PORT}`); export const connection: QueueOptions['connection'] = { host: '127.0.0.1', - port: 6379, + port: Number(env.VALKEY_PORT), password: '' }; diff --git a/services/worker/.config/env.ts b/services/worker/.config/env.ts index 3b9ab11b..14dad536 100644 --- a/services/worker/.config/env.ts +++ b/services/worker/.config/env.ts @@ -2,6 +2,8 @@ const env = (() => { if (!process.env.POCKETBASE_URL) throw new Error('POCKETBASE_URL missing in env'); if (!process.env.PORT) throw new Error('PORT missing in env'); + if (!process.env.WORKER_PORT) throw new Error('WORKER_PORT missing in env'); + if (!process.env.VALKEY_PORT) throw new Error('VALKEY_PORT missing in env'); if (!process.env.POCKETBASE_USERNAME) throw new Error('POCKETBASE_USERNAME missing in env'); if (!process.env.POCKETBASE_PASSWORD) throw new Error('POCKETBASE_PASSWORD missing in env'); if (!process.env.MUX_TOKEN_ID) throw new Error('MUX_TOKEN_ID missing in env'); @@ -16,11 +18,28 @@ const env = (() => { if (!process.env.AWS_SECRET_ACCESS_KEY) throw new Error('AWS_SECRET_ACCESS_KEY missing in env'); if (!process.env.AWS_REGION) throw new Error('AWS_REGION missing in env'); if (!process.env.AWS_ENDPOINT) throw new Error('AWS_ENDPOINT missing in env'); + if (!process.env.V1_AWS_BUCKET) throw new Error('V1_AWS_BUCKET missing in env'); + if (!process.env.V1_AWS_ACCESS_KEY_ID) throw new Error('V1_AWS_ACCESS_KEY_ID missing in env'); + if (!process.env.V1_AWS_SECRET_ACCESS_KEY) throw new Error('V1_AWS_SECRET_ACCESS_KEY missing in env'); + if (!process.env.V1_AWS_REGION) throw new Error('V1_AWS_REGION missing in env'); + if (!process.env.V1_AWS_ENDPOINT) throw new Error('V1_AWS_ENDPOINT missing in env'); if (!process.env.FANSLY_USERNAME) throw new Error('FANSLY_USERNAME missing in env'); if (!process.env.FANSLY_PASSWORD) throw new Error('FANSLY_PASSWORD missing in env'); + if (!process.env.APIFY_TOKEN) throw new Error('APIFY_TOKEN missing in env'); + if (!process.env.NODE_ENV) throw new Error('APIFY_TOKEN missing in env'); + if (!process.env.CACHE_ROOT) throw new Error('CACHE_ROOT missing in env'); + if (!process.env.SEEDBOX_SFTP_URL) throw new Error('SEEDBOX_SFTP_URL missing in env'); + if (!process.env.SEEDBOX_SFTP_USERNAME) throw new Error('SEEDBOX_SFTP_USERNAME missing in env'); + if (!process.env.SEEDBOX_SFTP_PASSWORD) throw new Error('SEEDBOX_SFTP_PASSWORD missing in env'); + if (!process.env.QBT_HOST) throw new Error('QBT_HOST missing in env'); + if (!process.env.QBT_PORT) throw new Error('QBT_PORT missing in env'); + if (!process.env.QBT_PASSWORD) throw new Error('QBT_PASSWORD missing in env'); + if (!process.env.QBT_USERNAME) throw new Error('QBT_USERNAME missing in env'); + const { PORT, + WORKER_PORT, POCKETBASE_URL, POCKETBASE_USERNAME, POCKETBASE_PASSWORD, @@ -36,11 +55,28 @@ const env = (() => { AWS_SECRET_ACCESS_KEY, AWS_REGION, AWS_ENDPOINT, + V1_AWS_BUCKET, + V1_AWS_ACCESS_KEY_ID, + V1_AWS_SECRET_ACCESS_KEY, + V1_AWS_REGION, + V1_AWS_ENDPOINT, FANSLY_USERNAME, FANSLY_PASSWORD, + APIFY_TOKEN, + NODE_ENV, + CACHE_ROOT, + SEEDBOX_SFTP_URL, + SEEDBOX_SFTP_USERNAME, + SEEDBOX_SFTP_PASSWORD, + VALKEY_PORT, + QBT_HOST, + QBT_USERNAME, + QBT_PASSWORD, + QBT_PORT, } = process.env return { PORT, + WORKER_PORT, POCKETBASE_URL, POCKETBASE_USERNAME, POCKETBASE_PASSWORD, @@ -56,8 +92,24 @@ const env = (() => { AWS_SECRET_ACCESS_KEY, AWS_REGION, AWS_ENDPOINT, + V1_AWS_BUCKET, + V1_AWS_ACCESS_KEY_ID, + V1_AWS_SECRET_ACCESS_KEY, + V1_AWS_REGION, + V1_AWS_ENDPOINT, FANSLY_PASSWORD, FANSLY_USERNAME, + APIFY_TOKEN, + NODE_ENV, + CACHE_ROOT, + SEEDBOX_SFTP_URL, + SEEDBOX_SFTP_USERNAME, + SEEDBOX_SFTP_PASSWORD, + VALKEY_PORT, + QBT_HOST, + QBT_USERNAME, + QBT_PASSWORD, + QBT_PORT, } })() diff --git a/services/worker/.config/qBittorrent/config/categories.json b/services/worker/.config/qBittorrent/config/categories.json new file mode 100644 index 00000000..2c63c085 --- /dev/null +++ b/services/worker/.config/qBittorrent/config/categories.json @@ -0,0 +1,2 @@ +{ +} diff --git a/services/worker/.config/qBittorrent/config/lockfile b/services/worker/.config/qBittorrent/config/lockfile new file mode 100644 index 00000000..e69de29b diff --git a/services/worker/.config/qBittorrent/config/rss/feeds.json b/services/worker/.config/qBittorrent/config/rss/feeds.json new file mode 100644 index 00000000..2c63c085 --- /dev/null +++ b/services/worker/.config/qBittorrent/config/rss/feeds.json @@ -0,0 +1,2 @@ +{ +} diff --git a/services/worker/.config/qBittorrent/config/rss/storage.lock b/services/worker/.config/qBittorrent/config/rss/storage.lock new file mode 100644 index 00000000..e69de29b diff --git a/services/worker/.config/qBittorrent/config/watched_folders.json b/services/worker/.config/qBittorrent/config/watched_folders.json new file mode 100644 index 00000000..2c63c085 --- /dev/null +++ b/services/worker/.config/qBittorrent/config/watched_folders.json @@ -0,0 +1,2 @@ +{ +} diff --git a/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb b/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb new file mode 100644 index 00000000..b817f052 Binary files /dev/null and b/services/worker/.config/qBittorrent/data/GeoDB/dbip-country-lite.mmdb differ diff --git a/services/worker/.config/qBittorrent/data/rss/articles/storage.lock b/services/worker/.config/qBittorrent/data/rss/articles/storage.lock new file mode 100644 index 00000000..e69de29b diff --git a/services/worker/.config/tsconfig.json b/services/worker/.config/tsconfig.json index bfa0fead..abd1626d 100644 --- a/services/worker/.config/tsconfig.json +++ b/services/worker/.config/tsconfig.json @@ -1,29 +1,30 @@ { "compilerOptions": { // Environment setup & latest features - "lib": ["ESNext"], + "lib": [ + "ESNext" + ], "target": "ESNext", "module": "Preserve", "moduleDetection": "force", - "jsx": "react-jsx", "allowJs": true, - + "types": [ + "node" + ], // Bundler mode "moduleResolution": "bundler", "allowImportingTsExtensions": true, "verbatimModuleSyntax": true, "noEmit": true, - // Best practices "strict": true, "skipLibCheck": true, "noFallthroughCasesInSwitch": true, "noUncheckedIndexedAccess": true, "noImplicitOverride": true, - // Some stricter flags (disabled by default) "noUnusedLocals": false, "noUnusedParameters": false, "noPropertyAccessFromIndexSignature": false } -} +} \ No newline at end of file diff --git a/services/worker/entrypoint.sh b/services/worker/entrypoint.sh deleted file mode 100755 index 3fbebd99..00000000 --- a/services/worker/entrypoint.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -/home/cj/.nvm/versions/node/v22.18.0/bin/node --import tsx ./src/index.ts \ No newline at end of file diff --git a/services/worker/package-lock.json b/services/worker/package-lock.json index 14f95667..3389c8da 100644 --- a/services/worker/package-lock.json +++ b/services/worker/package-lock.json @@ -12,6 +12,10 @@ "@mux/mux-node": "^12.8.0", "@types/express": "^5.0.5", "@types/js-yaml": "^4.0.9", + "@types/node": "^24.10.1", + "@types/semver": "^7.7.1", + "@types/ssh2": "^1.15.5", + "apify-client": "^2.19.0", "bullmq": "^5.63.0", "date-fns": "^4.1.0", "fs-extra": "^11.3.2", @@ -23,8 +27,10 @@ "puppeteer": "^24.30.0", "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-stealth": "^2.11.2", + "semver": "^7.7.3", "sharp": "^0.34.5", "slugify": "^1.6.6", + "ssh2": "^1.17.0", "which": "^5.0.0" }, "devDependencies": { @@ -35,6 +41,32 @@ "typescript": "^5" } }, + "node_modules/@apify/consts": { + "version": "2.47.0", + "resolved": "https://registry.npmjs.org/@apify/consts/-/consts-2.47.0.tgz", + "integrity": "sha512-VFVrNn/S3bLPrS3v9x7Td5b8YtxPbrepuXLWu27n06T34qQeogysbnAWIBNnTQ59qRKlxrAeFR4ezI+fRXTMNQ==", + "license": "Apache-2.0" + }, + "node_modules/@apify/log": { + "version": "2.5.26", + "resolved": "https://registry.npmjs.org/@apify/log/-/log-2.5.26.tgz", + "integrity": "sha512-o/Ciki7UwaXwdJ+tvTg7WG8Q3C+hZXYUpo2FrTG7Upr1PQW45PGfwLDuJxteIBsEXjIHtWitlLi4AZwG7mtRMQ==", + "license": "Apache-2.0", + "dependencies": { + "@apify/consts": "^2.47.0", + "ansi-colors": "^4.1.1" + } + }, + "node_modules/@apify/utilities": { + "version": "2.23.0", + "resolved": "https://registry.npmjs.org/@apify/utilities/-/utilities-2.23.0.tgz", + "integrity": "sha512-WAyenlKvtXtvd6V8D2fYwbsmc3dMn3z02JaOhZNx/p8u0NuacNgoLk/+PW4IPWrNxhqgDQZqde4cpNfZOktvsQ==", + "license": "Apache-2.0", + "dependencies": { + "@apify/consts": "^2.47.0", + "@apify/log": "^2.5.26" + } + }, "node_modules/@babel/code-frame": { "version": "7.27.1", "resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.27.1.tgz", @@ -91,6 +123,18 @@ "@bull-board/api": "6.14.0" } }, + "node_modules/@crawlee/types": { + "version": "3.15.3", + "resolved": "https://registry.npmjs.org/@crawlee/types/-/types-3.15.3.tgz", + "integrity": "sha512-RvgVPXrsQw4GQIUXrC1z1aNOedUPJnZ/U/8n+jZ0fu1Iw9moJVMuiuIxSI8q1P6BA84aWZdalyfDWBZ3FMjsiw==", + "license": "Apache-2.0", + "dependencies": { + "tslib": "^2.4.0" + }, + "engines": { + "node": ">=16.0.0" + } + }, "node_modules/@emnapi/runtime": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.7.0.tgz", @@ -1452,6 +1496,18 @@ "win32" ] }, + "node_modules/@sindresorhus/is": { + "version": "4.6.0", + "resolved": "https://registry.npmjs.org/@sindresorhus/is/-/is-4.6.0.tgz", + "integrity": "sha512-t09vSN3MdfsyCHoFcTRCH/iUtG7OJ0CsjzB8cjAmKc/va/kIgeDI/TxsigdncE/4be734m0cvIYwNaV4i2XqAw==", + "license": "MIT", + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sindresorhus/is?sponsor=1" + } + }, "node_modules/@standard-schema/spec": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.0.0.tgz", @@ -1594,6 +1650,12 @@ "integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==", "license": "MIT" }, + "node_modules/@types/semver": { + "version": "7.7.1", + "resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.7.1.tgz", + "integrity": "sha512-FmgJfu+MOcQ370SD0ev7EI8TlCAfKYU+B4m5T3yXc1CiRN94g/SZPtsCkk506aUDtlMnFZvasDwHHUcZUEaYuA==", + "license": "MIT" + }, "node_modules/@types/send": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/@types/send/-/send-1.2.1.tgz", @@ -1624,6 +1686,30 @@ "@types/node": "*" } }, + "node_modules/@types/ssh2": { + "version": "1.15.5", + "resolved": "https://registry.npmjs.org/@types/ssh2/-/ssh2-1.15.5.tgz", + "integrity": "sha512-N1ASjp/nXH3ovBHddRJpli4ozpk6UdDYIX4RJWFa9L1YKnzdhTlVmiGHm4DZnj/jLbqZpes4aeR30EFGQtvhQQ==", + "license": "MIT", + "dependencies": { + "@types/node": "^18.11.18" + } + }, + "node_modules/@types/ssh2/node_modules/@types/node": { + "version": "18.19.130", + "resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.130.tgz", + "integrity": "sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==", + "license": "MIT", + "dependencies": { + "undici-types": "~5.26.4" + } + }, + "node_modules/@types/ssh2/node_modules/undici-types": { + "version": "5.26.5", + "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz", + "integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==", + "license": "MIT" + }, "node_modules/@types/yauzl": { "version": "2.10.3", "resolved": "https://registry.npmjs.org/@types/yauzl/-/yauzl-2.10.3.tgz", @@ -1808,6 +1894,15 @@ "node": ">= 8.0.0" } }, + "node_modules/ansi-colors": { + "version": "4.1.3", + "resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz", + "integrity": "sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==", + "license": "MIT", + "engines": { + "node": ">=6" + } + }, "node_modules/ansi-regex": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz", @@ -1832,6 +1927,25 @@ "url": "https://github.com/chalk/ansi-styles?sponsor=1" } }, + "node_modules/apify-client": { + "version": "2.19.0", + "resolved": "https://registry.npmjs.org/apify-client/-/apify-client-2.19.0.tgz", + "integrity": "sha512-cDYwbygx/OplyF9MXTeb70nKwZHQQIp5OodsPeikWrh8sHmfeKWUu9jUUzeiWpHIPcwroYrP7KxA6UDSBGY3kQ==", + "license": "Apache-2.0", + "dependencies": { + "@apify/consts": "^2.42.0", + "@apify/log": "^2.2.6", + "@apify/utilities": "^2.18.0", + "@crawlee/types": "^3.3.0", + "agentkeepalive": "^4.2.1", + "async-retry": "^1.3.3", + "axios": "^1.6.7", + "content-type": "^1.0.5", + "ow": "^0.28.2", + "tslib": "^2.5.0", + "type-fest": "^4.0.0" + } + }, "node_modules/argparse": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz", @@ -1847,6 +1961,15 @@ "node": ">=0.10.0" } }, + "node_modules/asn1": { + "version": "0.2.6", + "resolved": "https://registry.npmjs.org/asn1/-/asn1-0.2.6.tgz", + "integrity": "sha512-ix/FxPn0MDjeyJ7i/yoHGFt/EX6LyNbxSEhPPXODPL+KB0VPk86UYfL0lMdy+KCnv+fmvIzySwaK5COwqVbWTQ==", + "license": "MIT", + "dependencies": { + "safer-buffer": "~2.1.0" + } + }, "node_modules/assertion-error": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-2.0.1.tgz", @@ -1875,10 +1998,30 @@ "integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==", "license": "MIT" }, + "node_modules/async-retry": { + "version": "1.3.3", + "resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.3.tgz", + "integrity": "sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==", + "license": "MIT", + "dependencies": { + "retry": "0.13.1" + } + }, "node_modules/asynckit": { "version": "0.4.0", "license": "MIT" }, + "node_modules/axios": { + "version": "1.13.2", + "resolved": "https://registry.npmjs.org/axios/-/axios-1.13.2.tgz", + "integrity": "sha512-VPk9ebNqPcy5lRGuSlKx752IlDatOjT9paPlm8A7yOuW2Fbvp4X3JznJtT4f0GzGLLiWE9W8onz51SqLYwzGaA==", + "license": "MIT", + "dependencies": { + "follow-redirects": "^1.15.6", + "form-data": "^4.0.4", + "proxy-from-env": "^1.1.0" + } + }, "node_modules/b4a": { "version": "1.7.3", "resolved": "https://registry.npmjs.org/b4a/-/b4a-1.7.3.tgz", @@ -1999,6 +2142,15 @@ "node": ">=10.0.0" } }, + "node_modules/bcrypt-pbkdf": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.2.tgz", + "integrity": "sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w==", + "license": "BSD-3-Clause", + "dependencies": { + "tweetnacl": "^0.14.3" + } + }, "node_modules/body-parser": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/body-parser/-/body-parser-2.2.0.tgz", @@ -2037,6 +2189,15 @@ "node": "*" } }, + "node_modules/buildcheck": { + "version": "0.0.6", + "resolved": "https://registry.npmjs.org/buildcheck/-/buildcheck-0.0.6.tgz", + "integrity": "sha512-8f9ZJCUXyT1M35Jx7MkBgmBMo3oHTTBIPLiY9xyL0pl3T5RwcPEY8cUHr5LBNfu/fk6c2T4DJZuVM/8ZZT2D2A==", + "optional": true, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/bullmq": { "version": "5.63.0", "license": "MIT", @@ -2254,6 +2415,20 @@ } } }, + "node_modules/cpu-features": { + "version": "0.0.10", + "resolved": "https://registry.npmjs.org/cpu-features/-/cpu-features-0.0.10.tgz", + "integrity": "sha512-9IkYqtX3YHPCzoVg1Py+o9057a3i0fp7S530UWokCSaFVTc7CwXPRiOjRjBQQ18ZCNafx78YfnG+HALxtVmOGA==", + "hasInstallScript": true, + "optional": true, + "dependencies": { + "buildcheck": "~0.0.6", + "nan": "^2.19.0" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/cron-parser": { "version": "4.9.0", "license": "MIT", @@ -2357,6 +2532,21 @@ "integrity": "sha512-vhE6eymDQSKWUXwwA37NtTTVEzjtGVfDr3pRbsWEQ5onH/Snp2c+2xZHWJJawG/0hCCJLRGt4xVtEVUVILol4w==", "license": "BSD-3-Clause" }, + "node_modules/dot-prop": { + "version": "6.0.1", + "resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-6.0.1.tgz", + "integrity": "sha512-tE7ztYzXHIeyvc7N+hR3oi7FIbf/NIjVP9hmAt3yMXzrQ072/fpjGLx2GxNxGxUl5V73MEqYzioOMoVhGMJ5cA==", + "license": "MIT", + "dependencies": { + "is-obj": "^2.0.0" + }, + "engines": { + "node": ">=10" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/dunder-proto": { "version": "1.0.1", "license": "MIT", @@ -2778,6 +2968,26 @@ "integrity": "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ==", "license": "Apache-2.0" }, + "node_modules/follow-redirects": { + "version": "1.15.11", + "resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz", + "integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==", + "funding": [ + { + "type": "individual", + "url": "https://github.com/sponsors/RubenVerborgh" + } + ], + "license": "MIT", + "engines": { + "node": ">=4.0" + }, + "peerDependenciesMeta": { + "debug": { + "optional": true + } + } + }, "node_modules/for-in": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz", @@ -3243,6 +3453,15 @@ "node": ">=8" } }, + "node_modules/is-obj": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/is-obj/-/is-obj-2.0.0.tgz", + "integrity": "sha512-drqDG3cbczxxEJRoOXcOjtdp1J/lyp1mNn0xaznRs8+muBhgQcrnbspox5X5fOw0HnMnbfDzvnEMEtqDEJEo8w==", + "license": "MIT", + "engines": { + "node": ">=8" + } + }, "node_modules/is-plain-object": { "version": "2.0.4", "resolved": "https://registry.npmjs.org/is-plain-object/-/is-plain-object-2.0.4.tgz", @@ -3380,6 +3599,13 @@ "version": "3.1.0", "license": "MIT" }, + "node_modules/lodash.isequal": { + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz", + "integrity": "sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==", + "deprecated": "This package is deprecated. Use require('node:util').isDeepStrictEqual instead.", + "license": "MIT" + }, "node_modules/long": { "version": "5.3.2", "resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz", @@ -3542,6 +3768,13 @@ "@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3" } }, + "node_modules/nan": { + "version": "2.23.1", + "resolved": "https://registry.npmjs.org/nan/-/nan-2.23.1.tgz", + "integrity": "sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw==", + "license": "MIT", + "optional": true + }, "node_modules/nano-spawn": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/nano-spawn/-/nano-spawn-2.0.0.tgz", @@ -3695,6 +3928,25 @@ "protobufjs": "^7.2.4" } }, + "node_modules/ow": { + "version": "0.28.2", + "resolved": "https://registry.npmjs.org/ow/-/ow-0.28.2.tgz", + "integrity": "sha512-dD4UpyBh/9m4X2NVjA+73/ZPBRF+uF4zIMFvvQsabMiEK8x41L3rQ8EENOi35kyyoaJwNxEeJcP6Fj1H4U409Q==", + "license": "MIT", + "dependencies": { + "@sindresorhus/is": "^4.2.0", + "callsites": "^3.1.0", + "dot-prop": "^6.0.1", + "lodash.isequal": "^4.5.0", + "vali-date": "^1.0.0" + }, + "engines": { + "node": ">=12" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/pac-proxy-agent": { "version": "7.2.0", "resolved": "https://registry.npmjs.org/pac-proxy-agent/-/pac-proxy-agent-7.2.0.tgz", @@ -4256,6 +4508,15 @@ "url": "https://github.com/privatenumber/resolve-pkg-maps?sponsor=1" } }, + "node_modules/retry": { + "version": "0.13.1", + "resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz", + "integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==", + "license": "MIT", + "engines": { + "node": ">= 4" + } + }, "node_modules/rimraf": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/rimraf/-/rimraf-3.0.2.tgz", @@ -4358,6 +4619,8 @@ }, "node_modules/semver": { "version": "7.7.3", + "resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz", + "integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==", "license": "ISC", "bin": { "semver": "bin/semver.js" @@ -4656,6 +4919,23 @@ "node": ">=0.10.0" } }, + "node_modules/ssh2": { + "version": "1.17.0", + "resolved": "https://registry.npmjs.org/ssh2/-/ssh2-1.17.0.tgz", + "integrity": "sha512-wPldCk3asibAjQ/kziWQQt1Wh3PgDFpC0XpwclzKcdT1vql6KeYxf5LIt4nlFkUeR8WuphYMKqUA56X4rjbfgQ==", + "hasInstallScript": true, + "dependencies": { + "asn1": "^0.2.6", + "bcrypt-pbkdf": "^1.0.2" + }, + "engines": { + "node": ">=10.16.0" + }, + "optionalDependencies": { + "cpu-features": "~0.0.10", + "nan": "^2.23.0" + } + }, "node_modules/stackback": { "version": "0.0.2", "resolved": "https://registry.npmjs.org/stackback/-/stackback-0.0.2.tgz", @@ -4832,6 +5112,24 @@ "fsevents": "~2.3.3" } }, + "node_modules/tweetnacl": { + "version": "0.14.5", + "resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-0.14.5.tgz", + "integrity": "sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA==", + "license": "Unlicense" + }, + "node_modules/type-fest": { + "version": "4.41.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.41.0.tgz", + "integrity": "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA==", + "license": "(MIT OR CC0-1.0)", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "node_modules/type-is": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/type-is/-/type-is-2.0.1.tgz", @@ -4920,6 +5218,15 @@ "uuid": "dist/esm/bin/uuid" } }, + "node_modules/vali-date": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/vali-date/-/vali-date-1.0.0.tgz", + "integrity": "sha512-sgECfZthyaCKW10N0fm27cg8HYTFK5qMWgypqkXMQ4Wbl/zZKx7xZICgcoxIIE+WFAP/MBL2EFwC/YvLxw3Zeg==", + "license": "MIT", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/vary": { "version": "1.1.2", "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", diff --git a/services/worker/package.json b/services/worker/package.json index 59ad5845..153e9e1b 100644 --- a/services/worker/package.json +++ b/services/worker/package.json @@ -12,6 +12,10 @@ "@mux/mux-node": "^12.8.0", "@types/express": "^5.0.5", "@types/js-yaml": "^4.0.9", + "@types/node": "^24.10.1", + "@types/semver": "^7.7.1", + "@types/ssh2": "^1.15.5", + "apify-client": "^2.19.0", "bullmq": "^5.63.0", "date-fns": "^4.1.0", "fs-extra": "^11.3.2", @@ -23,8 +27,10 @@ "puppeteer": "^24.30.0", "puppeteer-extra": "^3.3.6", "puppeteer-extra-plugin-stealth": "^2.11.2", + "semver": "^7.7.3", "sharp": "^0.34.5", "slugify": "^1.6.6", + "ssh2": "^1.17.0", "which": "^5.0.0" }, "devDependencies": { diff --git a/services/worker/src/fixtures/pizza.avif b/services/worker/src/fixtures/pizza.avif new file mode 100644 index 00000000..02c2f9dd Binary files /dev/null and b/services/worker/src/fixtures/pizza.avif differ diff --git a/services/worker/src/fixtures/ubuntu-24.04.3-desktop-amd64.iso.torrent b/services/worker/src/fixtures/ubuntu-24.04.3-desktop-amd64.iso.torrent new file mode 100644 index 00000000..516328e1 Binary files /dev/null and b/services/worker/src/fixtures/ubuntu-24.04.3-desktop-amd64.iso.torrent differ diff --git a/services/worker/src/index.ts b/services/worker/src/index.ts index 970a8276..83668aeb 100644 --- a/services/worker/src/index.ts +++ b/services/worker/src/index.ts @@ -7,7 +7,7 @@ import { generalQueue } from './queues/generalQueue.ts'; import { gpuQueue } from './queues/gpuQueue.ts'; import { highPriorityQueue } from './queues/highPriorityQueue.ts'; import env from '../.config/env.ts'; - +import { version } from '../package.json'; const run = async () => { @@ -34,12 +34,52 @@ const run = async () => { await import('./workers/generalWorker.ts'); await import('./workers/gpuWorker.ts'); } else { - // @todo I separated these so that they can be ran on multiple machines + // @todo I separated these so that they can be ran on multiple machines. + // @todo we should activate these based on environment variable feature flags, not NODE_ENV await import('./workers/highPriorityWorker.ts'); await import('./workers/generalWorker.ts'); // await import('./workers/gpuWorker.ts'); // @todo implement } + app.get('/', (req: Request, res: Response) => { + res.send(` + + +

FP Worker version ${version}.

+

Actions

+

If you find yourself clicking one of these links a lot, consider automating it by putting the job in a queue on a every or pattern timer.

+ + `) + }) + app.use('/ui', serverAdapter.getRouter()); app.get('/task', async (req: Request, res: Response) => { @@ -51,26 +91,23 @@ const run = async () => { const data = { vodId }; switch (name) { - case 'presignMuxAssets': - case 'syncronizePatreon': - case 'analyzeAudio': - await generalQueue.add(name, data); - break; + case '': + throw new Error('job name was missing'); case 'scheduleVodProcessing': await gpuQueue.add(name, data); break; default: - throw new Error(`Unknown job name: ${name}`); + await highPriorityQueue.add(name, data); + break; } res.json({ ok: true }); }); - app.listen(env.PORT, () => { - console.log(`Bull Dashboard running at http://localhost:${env.PORT}/ui`); - console.log('To populate the queue, run ex:'); - console.log(` curl http://localhost:${env.PORT}/task?name=presignMuxAssets`); + app.listen(env.WORKER_PORT, () => { + console.log(`Bull Dashboard running at http://localhost:${env.WORKER_PORT}/ui`); + console.log(`Actions Menu: http://localhost:${env.WORKER_PORT}`); }); diff --git a/services/worker/src/processors/_Template.ts b/services/worker/src/processors/_Template.ts new file mode 100644 index 00000000..b318e5c8 --- /dev/null +++ b/services/worker/src/processors/_Template.ts @@ -0,0 +1,64 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import Client from "pocketbase"; +import { Vod } from "../types"; + + +const foo = 'bar'; + +/** + * barFunction + * + * the function that does the actual work. exported so it can be tested by vitest + */ +export async function barFunction(job: Job, vod: Vod) { + job.log('doing the task'); + // @todo insert real logic here + return `${foo}-${vod.id}`; +} + +/** + * getApplicableVods + * + * gets the vods from the db using filters which apply to this task + */ +async function getApplicableVods(pb: Client) { + const results = await pb.collection('vods').getList(1, 3, { + filter: "videoSrcB2 != '' && magnetLink = ''" + }) + const vods = results.items; + + return vods; +} +/** + * + * aTemplate + * + * A template to copy for when we make new processors. Shows the general scaffolding. + * + * Remember to makes processors + * * idempotent + * * fail fast + * * DRY + */ +export async function aTemplate(job: Job) { + + const pb = await getPocketBaseClient(); + const vods = await getApplicableVods(pb); + + job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`) + + for (let i = 0; i < vods.length; i++) { + const vod = vods[i] as unknown as Vod; + + const newlyGeneratedData = barFunction(job, vod); + + await pb.collection('vods').update(vod.id, { + newlyGeneratedData + }) + + // Optionally update progress for bull UI + const progress = Math.round(((i + 1) / vods.length) * 100); + await job.updateProgress(progress); + } +} diff --git a/services/worker/src/processors/analyzeAudio.ts b/services/worker/src/processors/analyzeAudio.ts index 57fc7ae4..766d5c7d 100644 --- a/services/worker/src/processors/analyzeAudio.ts +++ b/services/worker/src/processors/analyzeAudio.ts @@ -80,7 +80,7 @@ function parseEbur128(output: string): AudioStats { } -async function analyzeAudio(inputFile: string): Promise { +async function __analyzeAudio(inputFile: string): Promise { const args = [ "-hide_banner", "-i", inputFile, @@ -98,7 +98,7 @@ function assertPayload(payload: any): asserts payload is Payload { if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId"); } -export default async function main(job: Job) { +export async function analyzeAudio(job: Job) { // job.log('TIME TO ANALYZING AUDIO'); const payload = job.data; assertPayload(payload); @@ -122,7 +122,7 @@ export default async function main(job: Job) { await b2Download(vod.sourceVideo, videoFilePath); - const results = await analyzeAudio(videoFilePath); + const results = await __analyzeAudio(videoFilePath); job.log(`results=${JSON.stringify(results)}`); await vod.collection('vods').update(vodId, { diff --git a/services/worker/src/processors/copyV1VideoAll.ts b/services/worker/src/processors/copyV1VideoAll.ts new file mode 100644 index 00000000..decbeac8 --- /dev/null +++ b/services/worker/src/processors/copyV1VideoAll.ts @@ -0,0 +1,45 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import Client from "pocketbase"; +import { Vod } from "../types"; +import { generalQueue } from '../queues/generalQueue.ts'; + + +/** + * getApplicableVods + * + * gets the vods that are missing sourceVideo + */ +async function getApplicableVods(pb: Client) { + const results = await pb + .collection('vods') + .getFullList({ filter: "videoSrcB2 != '' && sourceVideo = ''" }); + const vods = results; + return vods; +} + + +/** + * + * copyV1VideoAll + * + * Copy all vod.videoSrcB2 from V1_AWS_BUCKET to vod.sourceVideo in AWS_BUCKET + */ +export async function copyV1VideoAll(job: Job) { + + const pb = await getPocketBaseClient(); + const vods = await getApplicableVods(pb); + + job.log(`copyV1VideoAll found ${vods.length} elligible vods.`) + + for (let i = 0; i < vods.length; i++) { + const vod = vods[i] as unknown as Vod; + + await generalQueue.add('copyV1VideoToV3', { + vodId: vod.id + }) + + } + + return { jobsQueued: vods.length } +} diff --git a/services/worker/src/processors/copyV1VideoToV3.spec.ts b/services/worker/src/processors/copyV1VideoToV3.spec.ts new file mode 100644 index 00000000..c5e15ec7 --- /dev/null +++ b/services/worker/src/processors/copyV1VideoToV3.spec.ts @@ -0,0 +1,37 @@ +import { RecordModel } from "pocketbase"; +import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails"; +import { test, expect, describe, vi } from 'vitest'; +import { Job } from "bullmq"; +import { copyBetweenBuckets } from "./copyV1VideoToV3"; + +describe('copyV1VideoToV3 integration', () => { + test("copyBetweenBuckets", async () => { + + const dummyJob = { + name: 'test-copy-v1-video-to-v3', + log: vi.fn((arg) => { + console.log('dummyJob.log:', arg); + return; + }), + updateProgress: vi.fn().mockResolvedValue(undefined), + } as unknown as Job; + + + const dummyVod: RecordModel = { + id: '1234', + collectionId: 'qwertyuiop', + collectionName: 'vods', + videoSource: '', + videoSrcB2: 'https://futureporn-b2.b-cdn.net/projektmelody-fansly-2025-11-12.mp4', + } + + const output = await copyBetweenBuckets(dummyJob, dummyVod); + + expect(output).toBe('qwertyuiop/1234/projektmelody-fansly-2025-11-12.mp4'); + + expect(dummyJob.log).toHaveBeenCalled(); + + }, 1000 * 60 * 10); + + +}) diff --git a/services/worker/src/processors/copyV1VideoToV3.ts b/services/worker/src/processors/copyV1VideoToV3.ts new file mode 100644 index 00000000..7c6f5c5c --- /dev/null +++ b/services/worker/src/processors/copyV1VideoToV3.ts @@ -0,0 +1,79 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import Client, { RecordModel } from "pocketbase"; +import { Vod } from "../types"; +import { basename } from 'node:path'; +import env from "../../.config/env"; +import spawn from 'nano-spawn'; + +const foo = 'bar'; + +/** + * barFunction + * + * the function that does the actual work. exported so it can be tested by vitest + */ +export async function copyBetweenBuckets(job: Job, vod: RecordModel) { + + + const s3KeyTarget = `${vod.collectionId}/${vod.id}/${basename(vod.videoSrcB2)}`; + const from = `b2://${env.V1_AWS_BUCKET}/${vod.videoSrcB2.split('/').at(-1)}`; + const toPrefix = `b2://${env.AWS_BUCKET}`; + const to = `${toPrefix}/${s3KeyTarget}`; + job.log(`Copying ${from} ${to}. This is a slow process, please be patient.`); + + + // @see https://github.com/sindresorhus/nano-spawn?tab=readme-ov-file#subprocesssymbolasynciterator + // unfortunately `b2 file server-side-copy` doesn't have any progress indicator in stdout or stderr. + // so we just have to fire and forget (and wait) + const subprocess = spawn('b2', ['file', 'server-side-copy', from, to]); + + await Promise.all([ + (async () => { + for await (const line of subprocess.stdout) { + await job.log(`stdout: ${line}`); + } + })(), + (async () => { + for await (const line of subprocess.stderr) { + await job.log(`stderr: ${line}`); + } + })(), + ]); + + return to.replace(toPrefix + '/', ''); +} + + +/** + * + * copyV1VideoToV3 + * + * Copy vod.videoSrcB2 from V1 S3 bucket to V3 S3 bucket + * + * Remember to makes processors + * * idempotent + * * fail fast + * * DRY + */ +export async function copyV1VideoToV3(job: Job) { + + const vodId = job.data?.vodId; + if (!vodId) throw new Error('vodId was missing from input data'); + + const pb = await getPocketBaseClient(); + const vod = await pb.collection('vods').getOne(job.data.vodId) + + + const sourceVideo = await copyBetweenBuckets(job, vod); + await pb.collection('vods').update(vod.id, { + sourceVideo + }); + + return { + vodId, + sourceVideo + } + + +} diff --git a/services/worker/src/processors/copyV2ThumbToV3.ts b/services/worker/src/processors/copyV2ThumbToV3.ts new file mode 100644 index 00000000..1a2bfea2 --- /dev/null +++ b/services/worker/src/processors/copyV2ThumbToV3.ts @@ -0,0 +1,25 @@ +import { type Job } from 'bullmq'; +import { getPocketBaseClient } from '../util/pocketbase'; + + + + +export async function copyV2ThumbToV3(job: Job) { + + const pb = await getPocketBaseClient(); + + job.log(`the job '${job.name}' is running`); + + + + const vods = await pb.collection('vods').getFullList({ + filter: 'thumbnail ~ ".png"', + sort: '-streamDate', + }); + + // console.log('vods as follows', JSON.stringify(vods)); + // console.log(vods[0]); + + + return { complete: 'kindasorta' }; +} \ No newline at end of file diff --git a/services/worker/src/processors/createMagnetLink.spec.ts b/services/worker/src/processors/createMagnetLink.spec.ts new file mode 100644 index 00000000..28e95598 --- /dev/null +++ b/services/worker/src/processors/createMagnetLink.spec.ts @@ -0,0 +1,16 @@ +import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails"; +import { test, expect, describe } from 'vitest'; + +describe('createMagnetLink integration', () => { + test("", async () => { + + // for (const datum of data) { + + // expect(datum.date).toBeTruthy(); + // expect(new Date(datum.date)).toStrictEqual(new Date(datum.expectedDate)); + + // } + }, 30000); + + +}) diff --git a/services/worker/src/processors/createMagnetLink.ts b/services/worker/src/processors/createMagnetLink.ts new file mode 100644 index 00000000..dc5c83d8 --- /dev/null +++ b/services/worker/src/processors/createMagnetLink.ts @@ -0,0 +1,74 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import type { Vod } from "../types"; + +const TWEET_URL_REGEX = + /^https?:\/\/(twitter\.com|x\.com)\/[A-Za-z0-9_]{1,15}\/status\/(\d+)(\?.*)?$/; + +export function tweetIdToDate(id: string | number): Date { + const snowflake = BigInt(id); + const timestamp = (snowflake >> 22n) + 1288834974657n; + return new Date(Number(timestamp)); +} + +export function getTweetId(url: string) { + if (!TWEET_URL_REGEX.test(url)) { + throw new Error(`Invalid tweet URL: ${url}`); + } + return url.split('/').at(-1)!; +} + +export function getTweetDates(tweetUrls: string[]): Date[] { + + tweetUrls.forEach((url) => { + if (!TWEET_URL_REGEX.test(url)) { + throw new Error(`Invalid tweet URL: ${url}`); + } + }); + + return tweetUrls + .map(url => url.split('/').at(-1)!) // add ! if you’re sure it exists + .map(tweetID => tweetIdToDate(tweetID)); +} + +export async function getApplicableVods() { + const pb = await getPocketBaseClient() + + const results = await pb.collection('vods').getList(1, 3, { + filter: "videoSrcB2 != '' && magnetLink = ''" + }) + const vods = results.items; + + return vods; +} + +/** + * + * createMagnetLink + * + * given a vod, scrape the tweet and populate the streamDate + */ +export async function createMagnetLinks(job: Job) { + + const pb = await getPocketBaseClient(); + const vods = await getApplicableVods(); + + job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`) + + for (let i = 0; i < vods.length; i++) { + const vod = vods[i]; + const magnetLink = await createMagnetLink(vod.videoSrcB2); + + await pb.collection('vods').update(vod.id, { + magnetLink + }) + + const progress = Math.round(((i + 1) / vods.length) * 100); + await job.updateProgress(progress); + } +} + + +export async function createMagnetLink(vod: Vod) { + +} diff --git a/services/worker/src/processors/createMuxAsset.ts b/services/worker/src/processors/createMuxAsset.ts new file mode 100644 index 00000000..ace868b4 --- /dev/null +++ b/services/worker/src/processors/createMuxAsset.ts @@ -0,0 +1,86 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import Client, { RecordModel } from "pocketbase"; +import { Vod } from "../types"; + + +interface MuxAssetCreationResponse { + data: { + video_quality: string; + status: string; + progress: { + state: string; + progress: number; + }; + playback_ids: Array<{ + policy: string; + id: string; + }>; + mp4_support: string; + max_resolution_tier: string; + master_access: string; + ingest_type: string; + id: string; + encoding_tier: string; + created_at: string; + }; +} + +/** + * + * create a mux asset on Mux.com + */ +export async function __createMuxAsset(vod: RecordModel) { + + const { videoSrcB2, muxAssetId, muxPlaybackId } = vod; + if (!videoSrcB2) throw new Error(`vod ${vod.id} was missing videoSrcB2`); + + if (muxAssetId !== '') throw new Error('this vod already has a muxAssetId'); + if (muxPlaybackId !== '') throw new Error('this vod already has a muxPlaybackId'); + + const res = await fetch('https://api.mux.com/video/v1/assets', { + body: JSON.stringify({ + "input": videoSrcB2, + "playback_policy": [ + "signed" + ] + }) + }) + + if (!res.ok) { + const body = await res.text(); + throw new Error(`Mux API error: ${res.status} ${res.statusText} ${body}`); + } + + const payload = await res.json() as unknown as MuxAssetCreationResponse; + if (!payload.data.status) { + throw new Error('invalid response data received from Mux--' + JSON.stringify(payload)); + } + + const playbackIdEntry = payload.data.playback_ids.find((id) => id.policy === 'signed'); + if (!playbackIdEntry) throw new Error('failed to find a playback_id with signed policy'); + const playbackId = playbackIdEntry.id; + + const assetId = payload.data.id; + + + return { playbackId, assetId }; +} + +export async function createMuxAsset(job: Job) { + + const pb = await getPocketBaseClient(); + + const vodId = job.data?.vodId; + + if (!vodId) throw new Error('vodId is missing in the job data.'); + const vod = await pb.collection('vods').getOne(vodId); + + job.log(`createMuxAsset for vod ${vodId}.`) + + const { assetId, playbackId } = (await __createMuxAsset(vod)); + job.log(`Created assetId=${assetId}, playbackId=${playbackId}`); + pb.collection('vods').update(vodId, { muxAssetId: assetId, muxPlaybackId: playbackId }); + job.log('Vod record updated. All done.'); + +} diff --git a/services/worker/src/processors/createTorrent.ts b/services/worker/src/processors/createTorrent.ts index 67969b52..595f7337 100644 --- a/services/worker/src/processors/createTorrent.ts +++ b/services/worker/src/processors/createTorrent.ts @@ -20,21 +20,18 @@ -import type { Helpers } from "graphile-worker"; -import { PrismaClient } from "../../generated/prisma"; -import { withAccelerate } from "@prisma/extension-accelerate"; -import { getOrDownloadAsset } from "../utils/cache"; -import { env } from "../config/env"; -import { getS3Client, uploadFile } from "../utils/s3"; +import env from "../../.config/env"; +import { sshClient } from "../util/sftp"; +import { qbtClient, QBTorrentInfo } from "../util/qbittorrent"; +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import spawn from "nano-spawn"; +import { join, basename } from 'node:path'; +import { tmpdir } from "node:os"; import { nanoid } from "nanoid"; -import { getNanoSpawn } from "../utils/nanoSpawn"; -import logger from "../utils/logger"; -import { basename, join } from "node:path"; -import { generateS3Path } from "../utils/formatters"; -import { sshClient } from "../utils/sftp"; -import { qbtClient } from "../utils/qbittorrent/qbittorrent"; +import { formatDate } from "date-fns"; + -const prisma = new PrismaClient().$extends(withAccelerate()); interface Payload { @@ -55,7 +52,7 @@ function assertPayload(payload: any): asserts payload is Payload { // const spawn = await getNanoSpawn() // const torrentFilePath = join(env.CACHE_ROOT, `${nanoid()}.torrent`); -// logger.debug('creating torrent & magnet link via imdl'); +// job.log('creating torrent & magnet link via imdl'); // const result = await spawn('imdl', [ // 'torrent', @@ -78,7 +75,7 @@ function assertPayload(payload: any): asserts payload is Payload { // } // const magnetLink = match[0]; -// logger.debug(`Magnet link=${magnetLink}`); +// job.log(`Magnet link=${magnetLink}`); // return { // magnetLink, @@ -92,10 +89,10 @@ async function createQBittorrentTorrent( videoFilePath: string ): Promise<{ magnetLink: string, - torrentFilePath: string + torrentFilePath: string, + info: QBTorrentInfo, }> { - const torrentInfo = await qbtClient.createTorrent(videoFilePath); - return torrentInfo + return qbtClient.createTorrent(videoFilePath); } // async function createTorrentfileTorrent( @@ -117,7 +114,7 @@ async function createQBittorrentTorrent( // const torrentFilePath = join(env.CACHE_ROOT, `${nanoid()}.torrent`); -// logger.debug('creating torrent & magnet link') +// job.log('creating torrent & magnet link') // const result = await spawn('torrentfile', [ // 'create', // '--magnet', @@ -140,7 +137,7 @@ async function createQBittorrentTorrent( // } // const magnetLink = match[0]; -// logger.debug(`Magnet link=${magnetLink}`); +// job.log(`Magnet link=${magnetLink}`); // return { // magnetLink, @@ -150,30 +147,30 @@ async function createQBittorrentTorrent( // } -async function uploadTorrentToSeedbox(videoFilePath: string, torrentFilePath: string) { +async function uploadTorrentToSeedbox(job: Job, videoFilePath: string, torrentFilePath: string) { + job.log(`Uploading ${videoFilePath} to seedbox...`); await sshClient.uploadFile(videoFilePath, './data'); + + job.log(`Uploading ${torrentFilePath} to seedbox...`); await sshClient.uploadFile(torrentFilePath, './watch'); } -export default async function main(payload: any, helpers: Helpers) { +export async function createTorrent(job: Job) { + const payload = job.data assertPayload(payload) const { vodId } = payload - const vod = await prisma.vod.findFirstOrThrow({ - where: { - id: vodId - } - }) + const pb = await getPocketBaseClient(); + const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers' }); - // const spawn = await getNanoSpawn(); // * [x] load vod - // * [x] exit if video.thumbnail already defined + // * [x] exit if video.magnetLink already defined if (vod.magnetLink) { - logger.info(`Doing nothing-- vod ${vodId} already has a magnet link.`) + job.log(`Doing nothing-- vod ${vodId} already has a magnet link.`) return; // Exit the function early } @@ -182,25 +179,29 @@ export default async function main(payload: any, helpers: Helpers) { } - logger.info('Creating torrent.') - const s3Client = getS3Client() + job.log('Creating torrent.'); + + // we gotta put the download in a place that qbittorrent docker container can access it + const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd'); + const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-'); + const videoFilePath = join(tmpdir(), `${formattedDate}-${vtubers}-${vod.id}.mp4`); // * [x] download video segments from pull-thru cache - const videoFilePath = await getOrDownloadAsset(s3Client, env.S3_BUCKET, vod.sourceVideo) - logger.debug(`videoFilePath=${videoFilePath}`) + const dlFrom = `b2://${env.AWS_BUCKET}/${vod.sourceVideo}`; + job.log(`downloading ${dlFrom} to ${videoFilePath}`); + await spawn('b2', ['file', 'download', dlFrom, videoFilePath]); - const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath) + const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath); - await uploadTorrentToSeedbox(videoFilePath, torrentFilePath) + await uploadTorrentToSeedbox(job, videoFilePath, torrentFilePath); - logger.debug(`updating vod record`); - await prisma.vod.update({ - where: { id: vodId }, - data: { magnetLink } + job.log(`updating vod record`); + await pb.collection('vods').update(vod.id, { + magnetLink }); - logger.info(`🏆 torrent creation complete.`) + job.log(`🏆 torrent creation complete.`); } \ No newline at end of file diff --git a/services/worker/src/processors/findWork.ts b/services/worker/src/processors/findWork.ts index 699caf09..547b1c0d 100644 --- a/services/worker/src/processors/findWork.ts +++ b/services/worker/src/processors/findWork.ts @@ -1,30 +1,55 @@ -import type { Task, Helpers } from "graphile-worker"; -import { PrismaClient } from "../../generated/prisma"; -import { withAccelerate } from "@prisma/extension-accelerate"; -import logger from "../utils/logger"; +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; +import Client from "pocketbase"; +import { generalQueue } from "../queues/generalQueue"; -const prisma = new PrismaClient().$extends(withAccelerate()); -const findWork: Task = async (_payload, helpers: Helpers) => { - logger.info(`findWork begin.`); +export async function findMissingTorrent(job: Job, pb: Client) { - const approvedUploads = await prisma.vod.findMany({ - where: { - OR: [ - { status: "approved" }, - { status: "pending" }, - ] - }, + const results = await pb.collection('vods').getList(1, 1, { + filter: "videoSrcB2 != '' && magnetLink = ''", + sort: '-streamDate' }); + const vods = results.items; + const vod = vods[0]; - logger.info(`findWork found ${approvedUploads.length} uploads.`); - for (let i = 0; i < approvedUploads.length; i++) { - const vod = approvedUploads[i]; - await helpers.addJob("scheduleVodProcessing", { vodId: vod.id }); - logger.info(`scheduleVodProcessing for vod ${vod.id}`); + job.log(`findWork found ${vod.id} in need of a torrent.`); + + const jobId = `createTorrent-${vod.id}`; + + const existingJob = await generalQueue.getJob(jobId); + + // only add a createTorrent job if there isn't one queued + if (existingJob?.isActive) { + job.log(`refusing to add createTorrent job for vod ${vod.id} because a similar job is active`); + } else { + + await generalQueue.add('createTorrent', { + vodId: vod.id + }, { + jobId + }); } +} - logger.info(`findWork finished.`); -}; -export default findWork; +/** + * + * findWork + * + * Remember to makes processors + * * idempotent + * * fail fast + * * DRY + */ +export async function findWork(job: Job) { + + const pb = await getPocketBaseClient(); + + await findMissingTorrent(job, pb); + + // findMissingThumbnail + // findMissingAudioAnalysis + // ... etc. + +} diff --git a/services/worker/src/processors/getAnnounceUrlDetails.spec.ts b/services/worker/src/processors/getAnnounceUrlDetails.spec.ts new file mode 100644 index 00000000..4941945e --- /dev/null +++ b/services/worker/src/processors/getAnnounceUrlDetails.spec.ts @@ -0,0 +1,34 @@ +import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails"; +import { test, expect, describe } from 'vitest'; + +describe('getAnnounceUrlDetails integration', () => { + test("getTweets", async () => { + + const testData = [ + { + announceUrl: 'https://x.com/ProjektMelody/status/1989881851817988146', + listedDate: '2025-11-16T02:22:58.000Z', + expectedDate: '2025-11-16T02:22:58.347Z' + }, { + announceUrl: 'https://x.com/ProjektMelody/status/1266096281002356736', + listedDate: '2020-05-28T19:57:30.000Z', + expectedDate: '2020-05-28T19:57:30.979Z', + }, { + announceUrl: 'https://x.com/ProjektMelody/status/1481416541493153795', + listedDate: '2022-01-13T00:03:21.000Z', + expectedDate: '2022-01-13T00:03:21.537Z', + } + ] + + const data = testData.map((datum) => ({ ...datum, date: tweetIdToDate(getTweetId(datum.announceUrl)) })) + + for (const datum of data) { + + expect(datum.date).toBeTruthy(); + expect(new Date(datum.date)).toStrictEqual(new Date(datum.expectedDate)); + + } + }, 30000); + + +}) diff --git a/services/worker/src/processors/getAnnounceUrlDetails.ts b/services/worker/src/processors/getAnnounceUrlDetails.ts new file mode 100644 index 00000000..291cb20e --- /dev/null +++ b/services/worker/src/processors/getAnnounceUrlDetails.ts @@ -0,0 +1,74 @@ +import { Job } from "bullmq"; +import { getPocketBaseClient } from "../util/pocketbase"; + + +const TWEET_URL_REGEX = + /^https?:\/\/(twitter\.com|x\.com)\/[A-Za-z0-9_]{1,15}\/status\/(\d+)(\?.*)?$/; + +export function tweetIdToDate(id: string | number): Date { + const snowflake = BigInt(id); + const timestamp = (snowflake >> 22n) + 1288834974657n; + return new Date(Number(timestamp)); +} + +export function getTweetId(url: string) { + if (!TWEET_URL_REGEX.test(url)) { + throw new Error(`Invalid tweet URL: ${url}`); + } + return url.split('/').at(-1)!; +} + +export function getTweetDates(tweetUrls: string[]): Date[] { + + tweetUrls.forEach((url) => { + if (!TWEET_URL_REGEX.test(url)) { + throw new Error(`Invalid tweet URL: ${url}`); + } + }); + + return tweetUrls + .map(url => url.split('/').at(-1)!) // add ! if you’re sure it exists + .map(tweetID => tweetIdToDate(tweetID)); +} + +export async function getVodsWithAnnounceUrlAndNoStreamDate() { + const pb = await getPocketBaseClient() + + const results = await pb.collection('vods').getList(1, 25, { + filter: "announceUrl != '' && streamDate = ''" + }) + const vods = results.items; + + return vods; +} + +/** + * + * getAnnounceUrlDetails + * + * given a vod.announceUrl, get the tweet timestamp and populate the streamDate + */ +export async function getAnnounceUrlDetails(job: Job) { + + const pb = await getPocketBaseClient(); + const vods = await getVodsWithAnnounceUrlAndNoStreamDate(); + + job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`) + + for (let i = 0; i < vods.length; i++) { + const vod = vods[i]; + job.log(`processing vod ${vod.id}`); + + const announceUrl = vod.announceUrl; + job.log(`announceUrl is ${vod.announceUrl}`); + + job.log(`getting streamDate from tweet`); + const streamDate = tweetIdToDate(getTweetId(announceUrl)); + job.log(`streamDate is ${streamDate}`); + + job.log('updating vod ' + vod.id); + await pb.collection('vods').update(vod.id, { + streamDate + }) + } +} diff --git a/services/worker/src/processors/screenRecordFansly.spec.ts b/services/worker/src/processors/screenRecordFansly.spec.ts new file mode 100644 index 00000000..7e78c9e1 --- /dev/null +++ b/services/worker/src/processors/screenRecordFansly.spec.ts @@ -0,0 +1,17 @@ +import { __screenRecordFansly } from "./screenRecordFansly"; +import { test, expect, describe } from 'vitest'; + +describe('screenRecordFansly', () => { + test("browser spawn", async () => { + + const result = await __screenRecordFansly('https://fansly.com/miamimilana'); + + expect(result).toEqual({ + complete: true, + results: { + "foo": "bar" + } + }); + }, 120000); + +}) diff --git a/services/worker/src/processors/screenRecordFansly.ts b/services/worker/src/processors/screenRecordFansly.ts new file mode 100644 index 00000000..5b41936d --- /dev/null +++ b/services/worker/src/processors/screenRecordFansly.ts @@ -0,0 +1,264 @@ +import { type Job } from 'bullmq'; +import { getPocketBaseClient } from '../util/pocketbase'; +import spawn, { SubprocessError } from 'nano-spawn'; +import { type Page, Locator } from 'puppeteer'; +import puppeteer from 'puppeteer-extra'; +import StealthPlugin from 'puppeteer-extra-plugin-stealth' +import env from '../../.config/env'; +puppeteer.use(StealthPlugin()) + + + +async function sleep(delay: number) { + return new Promise((resolve) => { + setTimeout(resolve, delay); + }); +} + + +type VTuber = { + id: string, + fansly: string, +} + +export async function screenRecordFansly(job: Job) { + job.log(`the job '${job.name}' is running`); + + const vtuberSlug = job.data?.vtuberSlug; + if (!vtuberSlug) throw new Error('vtuberSlug was missing from job data.'); + const pb = await getPocketBaseClient(); + + const list = await pb.collection('vtubers').getList(1, 50, { + filter: `slug = ${vtuberSlug}` + }) + if (list.items.length == 0) throw new Error(`failed to find matching vtuber (slug=${vtuberSlug}) in the database.`); + const vtuber = list.items.at(0) as unknown as VTuber; + + const fansly = vtuber?.fansly + if (!fansly) throw new Error(`vtuber ${vtuber?.id} was missing a fansly URL`); + + await __screenRecordFansly(fansly); +} + + + +export async function __screenRecordFansly(fansly: string) { + + let loop = true; + // get the vtuber we are to record (pocketbase record) + // get the vtuber's fansly + // open Firefox to the fansly URL + // ensure we are logged in + // ensure the stream is started + // open OBS and record + + // detect the end of stream + // stop recording + + // check for chromium + // try { + // await spawn('brave-browser', ['--help']); + // } catch (e) { + // if (e instanceof SubprocessError) { + // if (e.exitCode !== 0) { + // throw new Error('chromium exited with rc ' + e.exitCode); + // } + // } + // } + + // await spawn('firefox', ['--display', ':0.0', '-P', 'fp-worker', fansly]); + + + // Launch the browser and open a new blank page. + const browser = await puppeteer.launch({ headless: false, executablePath: '/usr/bin/brave-browser' }); + const page = await browser.newPage(); + + // Navigate the page to a URL. + await page.goto(fansly); + + // Set screen size. + await page.setViewport({ width: 1920, height: 1080 }); + + + // determine the screen we are on + async function handlePage(page: Page) { + console.log('Looking for a known page...'); + const warningPage = page.locator('div ::-p-text(Possible Age Restricted Content)'); + const loginModal = page.locator('div ::-p-text(Forgot password?)'); + const liveLoginRequiredPage = page.locator('::-p-text(to view the Stream)'); + const modelPage = page.locator('::-p-text( Live )'); + const notificationsModal = page.locator('::-p-text(Enable Push Notifications)'); + const streamSensitiveContent = page.locator('::-p-text(This Stream may contain sensitive content)'); + const following = page.locator('div ::-p-text(following)'); + const payment = page.locator('div ::-p-text(Valid Payment Method Required)'); + const video = page.locator('::-p-xpath(//video[contains(@src, "blob:")])') + + const foundPage = await Promise.race([ + warningPage.wait().then(() => 'warning'), + loginModal.wait().then(() => 'login'), + liveLoginRequiredPage.wait().then(() => 'liveLoginRequired'), + modelPage.wait().then(() => 'model'), + notificationsModal.wait().then(() => 'notifications'), + streamSensitiveContent.wait().then(() => 'sensitive'), + following.wait().then(() => 'following'), + payment.wait().then(() => 'payment'), + video.wait().then(() => 'video'), + ]) + + console.log('foundPage', foundPage); + switch (foundPage) { + case 'video': + await handleVideo(page); + break; + + case 'warning': + await handleWarningModal(page); + break; + + case 'login': + await handleLoginPage(page); + break; + + case 'liveLoginRequired': + await handleLiveLoginRequired(page); + break; + + case 'model': + await handleModelPage(page); + break; + + case 'fansly': + await handleLoginPage(page); + break; + + case 'sensitive': + await handleStreamSensitiveContent(page); + break; + + case 'notifications': + await handleNotifications(page); + break; + + case 'following': + await handleFollowing(page); + break; + + case 'payment': + throw new Error('payment method is required! Please use an account that already has payment enabled'); + break; + + default: + console.error("Unknown page"); + throw new Error("No known page matched"); + } + + + } + + + // 🟦 Page-specific handlers + async function handleVideo(page: Page) { + console.log('Handling video'); + + // * [ ] ensure we are at 50% volume + // * [ ] ensure we are playing + // * [ ] ensure we are theater mode + // * [ ] ensure we hide the "Brave is being controlled by automated test software" + // * [ ] ensure we hide brave notifications + await page.locator('') + loop = false; + + } + + async function handleFollowing(page: Page) { + console.log('Handling following'); + await page.locator('div ::-p-text(following)').click(); + } + + async function handleStreamSensitiveContent(page: Page) { + console.log('Handling stream sensitive content'); + await page.locator('div ::-p-text(here)').click(); + } + + async function handleNotifications(page: Page) { + console.log('Handling push notifications modal'); + await page.locator('div ::-p-text(Maybe Later)').click(); + } + + async function handleModelPage(page: Page) { + console.log('Handling model page...'); + await page.locator('::-p-text( Live )').click(); + } + + + async function handleLoginPage(page: Page) { + console.log("Handling login page..."); + await page.locator('#fansly_login').fill(env.FANSLY_USERNAME); + await page.locator('#fansly_password').fill(''); + await page.locator('#fansly_password').fill(env.FANSLY_PASSWORD); + await page.locator('::-p-text(Sign in)').click(); + } + + async function handleLiveLoginRequired(page: Page) { + console.log("Handling handleLiveLoginRequired..."); + await page.locator('body > app-root > div > div.site-wrapper.nav-bar-visible.nav-bar-mobile-visible.nav-bar-top-visible.is-live-route > div > app-live-route > div > div > div.flex-col.video-wrapper > div.flex-row.width-100.stream-placeholder > div.stream-placeholder-content > div > span:nth-child(1)').click(); + } + + async function handleTosModal(page: Page) { + console.log("Handling TOS modal..."); + await page.locator('#accept').click(); + } + + async function handleEmailVerification(page: Page) { + console.log("Handling email verification page..."); + await page.locator('#resend-link').click(); + } + + async function handleWarningModal(page: Page) { + console.log('clicking the Enter button...') + // const el = await page.locator('div ::-p-text(Enter)').waitHandle(); + // el?.evaluate((element) => element.click()) + + const el = await page.locator('div ::-p-text("Enter")').waitHandle(); + + if (el) { + await el.click({ count: 2, delay: 1000, debugHighlight: true }); // this usually fails + page.$eval('div.btn:nth-child(2)', el => el.click()); // this usually works + } + } + + + + while (loop) { + try { + await handlePage(page); + } catch (e) { + console.error('error while handling page.') + console.error(e) + } + await sleep(3000); + } + + + console.log('Now we sit back and enjoy teh show'); + + await obsRecord(); + + + // @todo wait for stream to end + await sleep(60000); + + // await browser.close(); + + + const results = { + foo: 'bar' + } + + return { complete: true, results }; +} + + +export async function obsRecord() { + await spawn('obs', ['--startrecording', '--minimize-to-tray']); +} \ No newline at end of file diff --git a/services/worker/src/queues/generalQueue.ts b/services/worker/src/queues/generalQueue.ts index ea0aaaa5..d1b1c3a0 100644 --- a/services/worker/src/queues/generalQueue.ts +++ b/services/worker/src/queues/generalQueue.ts @@ -1,5 +1,6 @@ import { Queue } from 'bullmq'; -export const generalQueue = new Queue('generalQueue'); +import { connection } from '../../.config/bullmq.config'; +export const generalQueue = new Queue('generalQueue', { connection }); await generalQueue.upsertJobScheduler( 'every-day-presign-mux-job', @@ -14,14 +15,7 @@ await generalQueue.upsertJobScheduler( ); // await generalQueue.upsertJobScheduler( -// 'copy-v2', -// { -// pattern: '* * * * *', // Runs at 07:03 every day -// }, -// { -// name: 'copyV2ThumbToV3', -// data: {}, -// opts: {}, // Optional additional job options -// }, -// ); - +// 'find-work-every-one-minute', +// { every: 1000 * 60 }, +// { name: 'findWork' } +// ) \ No newline at end of file diff --git a/services/worker/src/queues/gpuQueue.ts b/services/worker/src/queues/gpuQueue.ts index 154f919b..d229af46 100644 --- a/services/worker/src/queues/gpuQueue.ts +++ b/services/worker/src/queues/gpuQueue.ts @@ -1,7 +1,7 @@ import { Queue } from "bullmq"; - -export const gpuQueue = new Queue('gpuQueue'); +import { connection } from "../../.config/bullmq.config"; +export const gpuQueue = new Queue('gpuQueue', { connection }); await gpuQueue.upsertJobScheduler( 'schedule-vod-processing-recurring', diff --git a/services/worker/src/queues/highPriorityQueue.ts b/services/worker/src/queues/highPriorityQueue.ts index 33fc162f..dd23bc17 100644 --- a/services/worker/src/queues/highPriorityQueue.ts +++ b/services/worker/src/queues/highPriorityQueue.ts @@ -1,5 +1,6 @@ import { Queue } from "bullmq"; -export const highPriorityQueue = new Queue('highPriorityQueue'); +import { connection } from "../../.config/bullmq.config"; +export const highPriorityQueue = new Queue('highPriorityQueue', { connection }); await highPriorityQueue.upsertJobScheduler( 'sync-patreon-recurring-job', @@ -11,4 +12,16 @@ await highPriorityQueue.upsertJobScheduler( data: {}, opts: {} }, -) \ No newline at end of file +) + +await highPriorityQueue.upsertJobScheduler( + 'get-announce-url-details', + { + every: 1000 * 63 + }, + { + name: 'getAnnounceUrlDetails', + data: {}, + opts: {}, + }, +); \ No newline at end of file diff --git a/services/worker/src/queues/parentQueue.ts b/services/worker/src/queues/parentQueue.ts deleted file mode 100644 index 7e3543a9..00000000 --- a/services/worker/src/queues/parentQueue.ts +++ /dev/null @@ -1,4 +0,0 @@ -import { Queue } from 'bullmq'; -import { connection } from '../../.config/bullmq.config.ts'; - -export const parentMq = new Queue('parentMq', { connection }); diff --git a/services/worker/src/types.ts b/services/worker/src/types.ts new file mode 100644 index 00000000..093e85ca --- /dev/null +++ b/services/worker/src/types.ts @@ -0,0 +1,7 @@ +export type Vod = { + id: string, + magnetLink: string, + announceUrl: string, + videoSrcB2: string, + sourceVideo: string, +} \ No newline at end of file diff --git a/services/worker/src/util/qbittorrent.README.md b/services/worker/src/util/qbittorrent.README.md new file mode 100644 index 00000000..24e440ef --- /dev/null +++ b/services/worker/src/util/qbittorrent.README.md @@ -0,0 +1,7 @@ +qbittorrent library common problems and their solutions + +### Error: Torrent creation failed: Create new torrent file failed. Reason: no files in torrent [libtorrent:16]. + +This can happen when you give createTorrent() a file path that doesn't exist. + +The solution is to give createTorrent() a file path that exists in the qbittorrent-nox context. (If you're using a docker container, it could be a mounted volume issue) \ No newline at end of file diff --git a/services/worker/src/util/qbittorrent.spec.ts b/services/worker/src/util/qbittorrent.spec.ts new file mode 100644 index 00000000..b03f335d --- /dev/null +++ b/services/worker/src/util/qbittorrent.spec.ts @@ -0,0 +1,57 @@ +import { QBittorrentClient } from "./qbittorrent"; +import { test, expect, describe, beforeAll, expectTypeOf } from 'vitest'; +import { join } from "node:path"; + + +const fixturesDir = join(import.meta.dirname, '..', 'fixtures'); +const torrentFixture = join(fixturesDir, 'ubuntu-24.04.3-desktop-amd64.iso.torrent'); +const fileFixture = join(fixturesDir, 'pizza.avif'); + +describe('qbittorrent integration', () => { + let client: QBittorrentClient + beforeAll(async () => { + client = new QBittorrentClient(); + }); + + test("QBittorrentClient methods", async () => { + + + expect(client).toHaveProperty('addTorrent'); + expect(client).toHaveProperty('getInfoHashV2'); + expect(client).toHaveProperty('connect'); + expect(client).toHaveProperty('getMagnetLink'); + expect(client).toHaveProperty('createTorrent'); + expect(client).toHaveProperty('__getVersion'); + expect(client).toHaveProperty('__fetchTorrentFile'); + + }); + + test("__getVersion", async () => { + await client.connect(); + const { major, minor, patch, version } = await client.__getVersion(); + expectTypeOf(major).toBeNumber(); + expectTypeOf(minor).toBeNumber(); + expectTypeOf(patch).toBeNumber(); + expectTypeOf(version).toBeString(); + }); + + test("addTorrent", async () => { + const torrent = await client.addTorrent(torrentFixture); + expect(torrent).toHaveProperty('infohash_v1'); + expect(torrent).toHaveProperty('infohash_v2'); + expect(torrent).toHaveProperty('added_on'); + expect(torrent).toHaveProperty('magnet_uri'); + }); + test("connect", async () => { + await client.connect(); + }); + + test("createTorrent", async () => { + const torrent = await client.createTorrent(fileFixture); + console.log('torrent', torrent) + expect(torrent).toHaveProperty('magnetLink'); + expect(torrent).toHaveProperty('torrentFilePath'); + expect(torrent).toHaveProperty('info'); + }); + +}) diff --git a/services/worker/src/util/qbittorrent.ts b/services/worker/src/util/qbittorrent.ts new file mode 100644 index 00000000..cdb127c3 --- /dev/null +++ b/services/worker/src/util/qbittorrent.ts @@ -0,0 +1,500 @@ +/** + * src/utils/qbittorrent.ts + * + * qBittorrent API client + * Used for creating torrent v1/v2 hybrids. + * + * + * to keep things simple, + * we use the same mounted volume directory names inside the docker container + * as on the host. + * + * /srv/futureporn/worker/qbittorrent/downloads + * + */ + +import path from "node:path"; +import env from "../../.config/env"; +import { readFile, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join, basename } from "node:path"; +import { nanoid } from 'nanoid'; +import semverParse from 'semver/functions/parse'; +import { type SemVer } from 'semver'; + + + +interface QBittorrentClientOptions { + host?: string; + port?: number; + username?: string; + password?: string; +} + + +export interface TorrentCreatorTaskStatus { + errorMessage: string; + optimizeAlignment: boolean; + paddedFileSizeLimit: number; + pieceSize: number; + private: boolean; + sourcePath: string; + status: "Running" | "Finished" | "Failed" | string; // API may expand + taskID: string; + timeAdded: string; // raw string from qBittorrent + timeFinished: string; // raw string + timeStarted: string; // raw string + trackers: string[]; + urlSeeds: string[]; +} + +export type TorrentCreatorTaskStatusMap = Record; + +export interface QBTorrentInfo { + added_on: number; + amount_left: number; + auto_tmm: boolean; + availability: number; + category: string; + comment: string; + completed: number; + completion_on: number; + content_path: string; + dl_limit: number; + dlspeed: number; + download_path: string; + downloaded: number; + downloaded_session: number; + eta: number; + f_l_piece_prio: boolean; + force_start: boolean; + has_metadata: boolean; + hash: string; + inactive_seeding_time_limit: number; + infohash_v1: string; + infohash_v2: string; + last_activity: number; + magnet_uri: string; + max_inactive_seeding_time: number; + max_ratio: number; + max_seeding_time: number; + name: string; + num_complete: number; + num_incomplete: number; + num_leechs: number; + num_seeds: number; + popularity: number; + priority: number; + private: boolean; + progress: number; + ratio: number; + ratio_limit: number; + reannounce: number; + root_path: string; + save_path: string; + seeding_time: number; + seeding_time_limit: number; + seen_complete: number; + seq_dl: boolean; + size: number; + state: string; + super_seeding: boolean; + tags: string; + time_active: number; + total_size: number; + tracker: string; + trackers_count: number; + up_limit: number; + uploaded: number; + uploaded_session: number; + upspeed: number; +} + + + +/** + * QBittorrentClient + * + * @see https://qbittorrent-api.readthedocs.io/en/latest/apidoc/torrentcreator.html + */ +export class QBittorrentClient { + private readonly host: string; + private readonly port: number; + private readonly username: string; + private readonly password: string; + private readonly baseUrl: string; + private sidCookie: string | null = null; + + constructor(options: QBittorrentClientOptions = {}) { + const defaults = { + host: "localhost", + port: 8083, + username: "admin", + password: "adminadmin", + }; + + const envOptions = { + host: env.QBT_HOST!, + port: Number(env.QBT_PORT!), + username: env.QBT_USERNAME!, + password: env.QBT_PASSWORD!, + }; + + const { host, port, username, password } = { + ...defaults, + ...envOptions, + ...options, + }; + this.host = host; + this.port = port; + this.username = username; + this.password = password; + + this.baseUrl = `http://${this.host}:${this.port}`; + } + + async connect(): Promise { + console.log(`Connecting to qBittorrent at ${this.baseUrl}`); + await this.login(); + } + + /** + * Throw if QBittorrent version is less than 5 + */ + async versionCheck(): Promise { + await this.connect(); + const { major, version } = (await this.__getVersion()); + if (major < 5) throw new Error(`QBittorrent is outdated. Expected version >5. Got ${version}`); + } + + /** + * + * Query QBittorrent to get it's version. + */ + async __getVersion(): Promise { + const url = `${this.baseUrl}/api/v2/app/version`; + const res = await fetch(url, { headers: { "Cookie": this.sidCookie! } }); + const text = await res.text(); + const v = semverParse(text); + if (!v?.version) throw new Error(`failed to parse version from body text=${text}`); + return v; + } + + /** + * Logs into qBittorrent Web API. + * + * Example (cURL): + * curl -i \ + * --header 'Referer: http://localhost:8080' \ + * --data 'username=admin&password=adminadmin' \ + * http://localhost:8080/api/v2/auth/login + * + * Then use the returned SID cookie for subsequent requests. + */ + private async login(): Promise { + console.log(`login() begin. using username=${this.username}, password=${this.password} env=${env.NODE_ENV}`) + const response = await fetch(`${this.baseUrl}/api/v2/auth/login`, { + method: "POST", + headers: { + "Content-Type": "application/x-www-form-urlencoded", + Referer: this.baseUrl, + }, + body: new URLSearchParams({ + username: this.username, + password: this.password, + }), + }); + + const responseBody = await response.text(); + + if (!response.ok) { + const msg = `Login request failed (${response.status} ${response.statusText}). body=${responseBody}`; + console.error(msg); + throw new Error(msg); + } + + console.log(`Login response: status=${response.status} ${response.statusText}`); + console.log(`Headers: ${JSON.stringify([...response.headers.entries()])}`); + + // Extract SID cookie + const setCookie = response.headers.get("set-cookie"); + if (!setCookie) { + const msg = `Login failed: No SID cookie was returned. status=${response.status} ${response.statusText}. body=${responseBody}`; + console.error(msg); + throw new Error(msg); + } + + this.sidCookie = setCookie; + console.log("Successfully logged into qBittorrent."); + } + + + + + /** + * addTorrentCreationTask + * + * @param sourcePath - a file on disk that we want to turn into a torrent + * @returns torrentFilePath - a newly created torrent file + * @IMPORTANT we do not send a multipart form here. + * we only send a application/x-www-form-urlencoded + * form, and we do not upload file data. we only send a file path. + */ + private async addTorrentCreationTask(sourcePath: string): Promise { + const torrentFilePath = join(tmpdir(), `${nanoid()}.torrent`); + const url = `${this.baseUrl}/api/v2/torrentcreator/addTask`; + console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, torrentFilePath=${torrentFilePath}, url=${url}`); + + + console.log(`addTorrent using sourcePath=${sourcePath}`) + + if (!this.sidCookie) { + throw new Error("Not connected: SID cookie missing"); + } + + const trackers = [ + 'udp://tracker.future.porn:6969/announce' + ] + const params = new URLSearchParams({ + sourcePath, + isPrivate: "false", + format: "hybrid", + torrentFilePath: torrentFilePath, + comment: "https://futureporn.net", + source: "https://futureporn.net", + // trackers: trackers.join('\n'), // this doesn't work. the two trackers appear on the same line in a single, invalid URL + urlSeeds: "", + startSeeding: "false", // it always fails to seed right away (it works later.) + }); + params.append('trackers', trackers[0]); + // params.append('trackers', trackers[1]); // this is a problem. it overrides the first. + + + const res = await fetch(url, { + method: "POST", + headers: { + Cookie: this.sidCookie, + 'Content-Type': 'application/x-www-form-urlencoded', + }, + body: params as any, + }); + + if (!res.ok) { + const body = await res.text(); + throw new Error(`addTorrentCreationTask failed: ${res.status} ${res.statusText} ${body}`); + } + + console.log('addTorrent success.'); + + + + + if (!res.ok) { + const body = await res.text() + throw new Error(`addTask failed. status=${res.status} statusText=${res.statusText} body=${body}`); + } + + const text = await res.text(); + if (text.includes('Fail')) { + throw new Error('the response shows a failure--' + text); + } + + const data = JSON.parse(text); + console.log({ addTaskResponse: data }); + return data.taskID; + } + + + private async pollTorrentStatus(taskId: string): Promise { + while (true) { + console.log(`Polling torrent creation taskID=${taskId}`); + + const res = await fetch( + `${this.baseUrl}/api/v2/torrentcreator/status?taskId=${taskId}`, + { headers: { Cookie: this.sidCookie! } } + ); + + if (!res.ok) { + throw new Error(`status failed: ${res.status} ${res.statusText}`); + } + console.log('the request to poll for torrent status was successful.') + + const statusMap = (await res.json()) as TorrentCreatorTaskStatusMap; + console.log({ statusMap: statusMap }) + const task = Object.values(statusMap).find((t) => t.taskID === taskId); + console.log({ task: task }) + + + + if (!task) { + throw new Error(`Task ${taskId} not found in status response`); + } + + console.log(` Torrent creator task status=${task.status}`); + + switch (task.status) { + case "Failed": + const msg = `Torrent creation failed: ${task.errorMessage}`; + console.error(msg); + console.log('here is the task that failed', task); + throw new Error(msg); + case "Finished": + return task; + default: + // still running.. wait 1s and retry + await new Promise((r) => setTimeout(r, 1000)); + } + } + } + + + /** + * Fetch a .torrent file from qBittorrent. + * qBittorrent writes the .torrent file to a given local path + * + * @param taskId + * @param outputDir - the torrent will be written to this directory + * @returns + */ + private async __fetchTorrentFile(taskId: string, outputDir: string): Promise { + const url = `${this.baseUrl}/api/v2/torrentcreator/torrentFile?taskID=${taskId}`; + const res = await fetch(url, { + headers: { Cookie: this.sidCookie! }, // or use Authorization header + }); + + if (!res.ok) { + const body = await res.text(); + throw new Error(`torrentFile failed: ${res.status} ${res.statusText} ${body}`); + } + + const arrayBuffer = await res.arrayBuffer(); + const filePath = join(outputDir, `${taskId}.torrent`); + await writeFile(filePath, new Uint8Array(arrayBuffer)); + + return filePath; + } + + private async __getTorrentInfos(torrentName: string): Promise { + if (!torrentName) throw new Error('__getTorrentInfos requires torrentName as first arg. However, arg was falsy. '); + // ensure we're logged in + if (!this.sidCookie) throw new Error('We are not logged into QBittorrent. (sidCookie was missing.)'); + + // qBittorrent does NOT return infoHash directly here + // we have to get it by querying the torrents list + const torrentsRes = await fetch(`${this.baseUrl}/api/v2/torrents/info`, { + headers: { Cookie: this.sidCookie! }, + }); + const torrents = await torrentsRes.json() as Array<{ hash: string; name: string }>; + const torrent = torrents.find((t) => t.name === torrentName) as QBTorrentInfo; + + if (!torrent) { + throw new Error(`Torrent ${torrentName} not found in qBittorrent after adding`); + } + return torrent; + } + + async getInfoHashV2(torrentName: string): Promise { + console.log(`getInfoHashV2 using torrentName=${torrentName}`) + const torrent = await this.__getTorrentInfos(torrentName); + return torrent.infohash_v2; + } + + /** + * Add a local torrent file to qBittorrent. + */ + async addTorrent(localFilePath: string) { + if (!localFilePath) throw new Error('addTorrent requires a path to a local file, but it was undefined.'); + const bn = basename(localFilePath).replace('.torrent', ''); + await this.connect(); + await this.__addTorrent(localFilePath); + return (await this.__getTorrentInfos(bn)); + } + + /** + * Add a local torrent file to qBittorrent. + */ + private async __addTorrent(localFilePath: string): Promise { + + console.log(`addTorrent using localFilePath=${localFilePath}`) + + if (!this.sidCookie) { + throw new Error("Not connected. (SID cookie missing.)"); + } + + const form = new FormData(); + const fileBuffer = await readFile(localFilePath); + + + const blob = new Blob([fileBuffer]); // wrap Buffer in Blob (necessary for MDN FormData) + form.append("torrents", blob, path.basename(localFilePath)); // this is for MDN FormData + // form.append("torrents", fileBuffer); // this is for npm:form-data + + + form.append("savepath", "/tmp"); // optional: specify download path + form.append("paused", "true"); // start downloading immediately + + const res = await fetch(`${this.baseUrl}/api/v2/torrents/add`, { + method: "POST", + headers: { + Cookie: this.sidCookie, + }, + body: form as any, + }); + + if (!res.ok) { + const body = await res.text(); + throw new Error(`addTorrent failed: ${res.status} ${res.statusText} ${body}`); + } + + console.log('addTorrent success.'); + } + + async getMagnetLink(fileName: string): Promise { + console.log(`getMagnetLink using fileName=${fileName}`) + + // qBittorrent does NOT return infoHash directly here + // we have to get it by querying the torrents list + const torrent = await this.__getTorrentInfos(fileName); + + if (!torrent) { + throw new Error(`Torrent ${fileName} not found in qBittorrent after adding`); + } + + return torrent.magnet_uri; + } + + async createTorrent(localFilePath: string): Promise<{ torrentFilePath: string; magnetLink: string, info: QBTorrentInfo }> { + console.log(`Creating torrent from file: ${localFilePath}`); + await this.connect(); + + if (!this.sidCookie) { + throw new Error("sidCookie was missing. This is likely a bug."); + } + + // 1. start task + const taskId = await this.addTorrentCreationTask(localFilePath); + console.log(`Created torrent task ${taskId}`); + + // 2. poll until finished + await this.pollTorrentStatus(taskId); + + // 3. fetch torrent file + const torrentFilePath = await this.__fetchTorrentFile(taskId, tmpdir()); + + // 4. add the torrent to qBittorrent + // We *could* add the torrent in the torrentcreator, + // but that usually errors right away, + // so we add it here instead. + await this.__addTorrent(torrentFilePath); + + // 5. Get magnet link + const info = await this.__getTorrentInfos(basename(localFilePath)) + const magnetLink = info.magnet_uri; + + return { + magnetLink, torrentFilePath, info + }; + } +} + +export const qbtClient = new QBittorrentClient({ host: env.QBT_HOST }); diff --git a/services/worker/src/util/retry.ts b/services/worker/src/util/retry.ts new file mode 100644 index 00000000..4f9b8597 --- /dev/null +++ b/services/worker/src/util/retry.ts @@ -0,0 +1,15 @@ +import sleep from './sleep.ts'; + +export default async function retry(fn: any, retries = 6, delayMs = 500) { + let lastError; + for (let attempt = 1; attempt <= retries; attempt++) { + try { + return await fn(); + } catch (err) { + lastError = err; + console.warn(`Attempt ${attempt} failed: ${err}. Retrying in ${delayMs}ms...`); + if (attempt < retries) await sleep(delayMs); + } + } + throw lastError; +} \ No newline at end of file diff --git a/services/worker/src/util/sftp.ts b/services/worker/src/util/sftp.ts new file mode 100644 index 00000000..bae1ecae --- /dev/null +++ b/services/worker/src/util/sftp.ts @@ -0,0 +1,118 @@ +// src/utils/sftp.ts +import { Client, ConnectConfig, SFTPWrapper } from 'ssh2'; +import path from 'path'; +import env from '../../.config/env'; + +interface SSHClientOptions { + host: string; + port?: number; + username: string; + password?: string; + privateKey?: Buffer; +} + +export class SSHClient { + private client = new Client(); + private sftp?: SFTPWrapper; + private connected = false; + + constructor(private options: SSHClientOptions) { } + + async connect(): Promise { + if (this.connected) return; + + await new Promise((resolve, reject) => { + this.client + .on('ready', () => resolve()) + .on('error', reject) + .connect({ + host: this.options.host, + port: this.options.port || 22, + username: this.options.username, + password: this.options.password, + privateKey: this.options.privateKey, + } as ConnectConfig); + }); + + this.connected = true; + } + + private async getSFTP(): Promise { + if (!this.sftp) { + this.sftp = await new Promise((resolve, reject) => { + this.client.sftp((err, sftp) => { + if (err) reject(err); + else resolve(sftp); + }); + }); + } + return this.sftp; + } + + async exec(command: string): Promise { + await this.connect(); + return new Promise((resolve, reject) => { + this.client.exec(command, (err, stream) => { + if (err) return reject(err); + let stdout = ''; + let stderr = ''; + stream + .on('close', (code: number) => { + if (code !== 0) reject(new Error(`Command failed: ${stderr}`)); + else resolve(stdout.trim()); + }) + .on('data', (data: Buffer) => (stdout += data.toString())) + .stderr.on('data', (data: Buffer) => (stderr += data.toString())); + }); + }); + } + + async uploadFile(localFilePath: string, remoteDir: string): Promise { + console.log(`Uploading localFilePath=${localFilePath} to remoteDir=${remoteDir}...`); + + console.log('awaiting connect') + await this.connect(); + + console.log('getting sftp') + const sftp = await this.getSFTP(); + + console.log('getting fileName') + const fileName = path.basename(localFilePath); + + console.log(`fileName=${fileName}`) + + const remoteFilePath = path.posix.join(remoteDir, fileName); + console.log(`remoteFilePath=${remoteFilePath}`) + + await new Promise((resolve, reject) => { + sftp.fastPut(localFilePath, remoteFilePath, (err) => (err ? reject(err) : resolve())); + }); + } + + async downloadFile(remoteFilePath: string, localPath: string): Promise { + console.log(`downloading remoteFilePath=${remoteFilePath} to localPath=${localPath}`) + await this.connect(); + const sftp = await this.getSFTP(); + + await new Promise((resolve, reject) => { + sftp.fastGet(remoteFilePath, localPath, (err: any) => (err ? reject(err) : resolve())); + }); + } + + end(): void { + this.client.end(); + this.connected = false; + } +} + +// --- usage helper --- +const url = URL.parse(env.SEEDBOX_SFTP_URL); +const hostname = url?.hostname; +const port = url?.port; + +export const sshClient = new SSHClient({ + host: hostname!, + port: port ? parseInt(port) : 22, + username: env.SEEDBOX_SFTP_USERNAME, + password: env.SEEDBOX_SFTP_PASSWORD, +}); diff --git a/services/worker/src/util/sleep.ts b/services/worker/src/util/sleep.ts new file mode 100644 index 00000000..7059d6b4 --- /dev/null +++ b/services/worker/src/util/sleep.ts @@ -0,0 +1,3 @@ +export default async function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} \ No newline at end of file diff --git a/services/worker/src/workers/generalWorker.ts b/services/worker/src/workers/generalWorker.ts index 16ec6694..a9631b46 100644 --- a/services/worker/src/workers/generalWorker.ts +++ b/services/worker/src/workers/generalWorker.ts @@ -3,21 +3,37 @@ import { Worker } from 'bullmq'; import { connection } from '../../.config/bullmq.config.ts'; import { presignMuxAssets } from '../processors/presignMuxAssets.ts'; +import { copyV1VideoAll } from '../processors/copyV1VideoAll.ts'; import { copyV2ThumbToV3 } from '../processors/copyV2ThumbToV3.ts'; - +import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts'; +import { createTorrent } from '../processors/createTorrent.ts'; +import { analyzeAudio } from '../processors/analyzeAudio.ts'; +import { findWork } from '../processors/findWork.ts'; new Worker( 'generalQueue', async (job) => { console.log('generalWorker. we got a job on the generalQueue.', job.data, job.name); switch (job.name) { + case 'findWork': + return await findWork(job); + + case 'copyV1VideoAll': + return await copyV1VideoAll(job); + + case 'copyV1VideoToV3': + return await copyV1VideoToV3(job); + case 'presignMuxAssets': return await presignMuxAssets(job); case 'copyV2ThumbToV3': return await copyV2ThumbToV3(job); - // case 'analyzeAudio': - // return await analyzeAudio(job); + case 'createTorrent': + return await createTorrent(job); + + case 'analyzeAudio': + return await analyzeAudio(job); default: throw new Error(`Unknown job name: ${job.name}`); @@ -27,3 +43,4 @@ new Worker( ); console.log('generalWorker is running...'); + diff --git a/services/worker/src/workers/highPriorityWorker.ts b/services/worker/src/workers/highPriorityWorker.ts index 0b650606..4263d65c 100644 --- a/services/worker/src/workers/highPriorityWorker.ts +++ b/services/worker/src/workers/highPriorityWorker.ts @@ -3,6 +3,7 @@ import { Worker } from 'bullmq'; import { connection } from '../../.config/bullmq.config.ts'; import { syncronizePatreon } from '../processors/syncronizePatreon.ts' +import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts' new Worker( 'highPriorityQueue', @@ -11,7 +12,8 @@ new Worker( switch (job.name) { case 'syncronizePatreon': return await syncronizePatreon(job); - + case 'getAnnounceUrlDetails': + return await getAnnounceUrlDetails(job); default: throw new Error(`Unknown job name: ${job.name}`); } diff --git a/services/worker/systemd/up.sh b/services/worker/systemd/up.sh index 6c94b5f3..a3fd3545 100755 --- a/services/worker/systemd/up.sh +++ b/services/worker/systemd/up.sh @@ -2,7 +2,15 @@ loginctl enable-linger sudo cp worker.service /etc/systemd/user/worker.service + + systemctl --user daemon-reload systemctl --user restart worker +systemctl --user enable worker +systemctl --user status worker +systemctl --user status worker +systemctl status valkey + +echo "Done. Press Enter key to get worker logs or press Ctrl+C to quit" +read r journalctl --user -u worker -ef -systemctl --user status worker \ No newline at end of file diff --git a/services/worker/systemd/worker.service b/services/worker/systemd/worker.service index ceac9a8b..98d64e09 100644 --- a/services/worker/systemd/worker.service +++ b/services/worker/systemd/worker.service @@ -6,7 +6,7 @@ After=network.target Type=simple Restart=always RestartSec=5 -ExecStart=/home/cj/Documents/futureporn-monorepo/services/worker/entrypoint.sh +ExecStart=/home/cj/.nvm/versions/node/v22.18.0/bin/node --import tsx ./src/index.ts WorkingDirectory=/home/cj/Documents/futureporn-monorepo/services/worker EnvironmentFile=/home/cj/Documents/futureporn-monorepo/services/worker/.env.production.local Restart=on-failure