Compare commits

..

2 Commits

Author SHA1 Message Date
87b054e66f restrict playback to logged in visitors
Some checks failed
ci / test (push) Failing after 4m45s
fp/our CI/CD / build (push) Successful in 58s
2025-11-19 17:46:35 -08:00
741b06d22e remove ipfsCid 2025-11-16 06:28:11 -08:00
68 changed files with 2280 additions and 158 deletions

19
.vscode/tasks.json vendored
View File

@ -30,7 +30,7 @@
"type": "shell",
"command": "docker run -it -p 5050:5050 --rm --name futureporn-pgadmin -e PGADMIN_LISTEN_PORT=5050 -e PGADMIN_DISABLE_POSTFIX=1 -e PGADMIN_DEFAULT_EMAIL=cj@futureporn.net -e PGADMIN_DEFAULT_PASSWORD=password dpage/pgadmin4",
"problemMatcher": [],
"isBackground": true,
"isBackground": true
},
{
"label": "Run Docker Compose",
@ -106,10 +106,23 @@
"runOn": "folderOpen"
}
},
{
"label": "Run qBittorrent",
"type": "shell",
"command": "qbittorrent-nox --confirm-legal-notice --webui-port=8069 --profile=/home/cj/.config --configuration=futureporn",
"isBackground": true,
"options": {
"cwd": "${workspaceFolder}/services/worker"
},
"runOptions": {
"runOn": "folderOpen"
},
"problemMatcher": []
},
{
"label": "Run valkey",
"type": "shell",
"command": "docker run --name futureporn-valkey --rm -p 6379:6379 valkey/valkey",
"command": "docker run --name futureporn-valkey-DEVELOPMENT --rm -p 6667:6379 valkey/valkey",
"isBackground": true,
"problemMatcher": [],
"options": {
@ -124,7 +137,7 @@
"type": "shell",
"command": "curl http://localhost:3000/task?title=fmv",
"isBackground": false,
"problemMatcher": [],
"problemMatcher": []
}
]
}

View File

@ -1,6 +1,6 @@
{
"name": "futureporn",
"version": "3.4.0",
"version": "3.5.0",
"private": true,
"description": "Dedication to the preservation of lewdtuber history",
"license": "Unlicense",

View File

@ -42,11 +42,11 @@
<a href="/account">Account</a>
</div>
<div class="level-item">
<a href="/auth/logout">Logout</a>
<a href="/auth/logout?returnTo=<%= request.url.href %>">Logout</a>
</div>
<% } else { %>
<div class="level-item">
<a href="/auth/login">Login</a>
<a href="/auth/login?returnTo=<%= request.url.href %>">Login</a>
</div>
<% } %>
</div>
@ -71,12 +71,25 @@
<footer class="footer mt-5">
<div class="content has-text-centered">
<p>
<strong>Futureporn <%= meta('version') %></strong> made with love by <a href="https://t.co/I8p0oH0AAB">@CJ_Clippy</a>.
<strong>Futureporn <%= meta('version') %></strong> made with love by <a href="https://cjclippy.carrd.co/">@CJ_Clippy</a>.
</p>
<div class="container">
<div class="notification is-info">Hello World</div>
<% if (!auth) { %>
<div class="notification is-info">
<p>Hi thanks for visiting! 🤠</p>
<p>We are receiving a lot of traffic and are forced to restrict playback to logged in visitors.</p>
<p>To watch a vod, please log in.</p>
</div>
<% } %>
<div class="notification is-info">
<p>Torrent downloads are coming soon.</p>
</div>
<div class="notification">
<p>Futureporn is no longer using IPFS. <a target="_blank" href="https://www.patreon.com/posts/143616210">Read more</a></p>
</div>
</div>
</div>
</footer>

View File

@ -5,8 +5,7 @@
*
* This middleware handles setting data.user for auth purposes
*/
module.exports = function ({ meta, redirect, request, auth }) {
module.exports = function ({ auth }, next) {
let user;
@ -14,5 +13,5 @@ module.exports = function ({ meta, redirect, request, auth }) {
user = $app.findFirstRecordByData('users', 'id', auth.id);
}
return { user }
next({ user })
}

View File

@ -1,9 +1,10 @@
module.exports = (api, next) => {
const { auth, redirect } = api
module.exports = ({ auth, redirect }, next) => {
if (!auth) {
return redirect('/auth/login', {
message: 'You must be logged in to access this page',
})
}
console.log('calling next()', next)
next()
}

View File

@ -1,8 +1,18 @@
<script server>
let error = null
const methods = pb().collection('users').listAuthMethods();
dbg({
methods
dbg(JSON.stringify(params, null, 2))
let returnTo = params?.returnTo || '/account';
request.event.setCookie({
name: 'returnTo',
value: returnTo,
path: '/',
maxAge: 900,
secure: true,
httpOnly: true,
sameSite: true
})
const {

View File

@ -1,4 +1,5 @@
<script server>
let returnTo = params?.returnTo || '/';
signOut()
redirect(`/`)
redirect(returnTo);
</script>

View File

@ -6,11 +6,13 @@
let error = null
let authData = null
let returnTo = request.cookies("returnTo") || '/account';
try {
authData = signInWithOAuth2(state, code)
dbg("the shit user id is " + authData.record.id)
if (!authData.record.id) {
throw new Error('invalid authData. authData must contain a record.id');
}
@ -35,8 +37,8 @@
dbg("the user is NOT new");
}
console.log(JSON.stringify(authData, null, 2))
dbg("authData:", stringify(authData))
// console.log(JSON.stringify(authData, null, 2))
// dbg("authData:", stringify(authData))
user.set('patreonAccessToken', authData.meta.accessToken);
@ -55,7 +57,8 @@
// The backend later removes the status if they aren't subscribed
// let user = $app.findRecordById('user', )
response.redirect('/')
dbg(`redirecting to ${returnTo}. cookies=${JSON.stringify(request.cookies(), null, 2)}`);
response.redirect(returnTo);
} catch (e) {
error = e.message
}

View File

@ -6,11 +6,13 @@
try {
provider = body().provider
const url = requestOAuth2Login(provider)
console.log('url is as follows')
console.log(url)
const returnTo = params?.returnTo || '/';
response.redirect(url)
} catch (error) {
console.log("FAILURE! We failed to get the OAuth2Login from provider", provider)
console.error("FAILURE! We failed to get the OAuth2Login from provider", provider)
error = error.message
}
}

View File

@ -21,7 +21,7 @@ module.exports = function (api) {
username: p.get('publicUsername'),
}));
console.log('Patrons:', JSON.stringify(patrons, null, 2));
// console.log('Patrons:', JSON.stringify(patrons, null, 2));
return { patrons };

View File

@ -13,11 +13,11 @@ module.exports = function (api) {
console.error('error!', e.message);
if (e.message.match(/no rows/)) {
console.log('we are sending 404')
console.error('we are sending 404')
return response.html(404, 'VOD not found')
} else {
console.log('we are sending error 500')
console.error('we are sending error 500')
return response.html(500, 'Unknown internal error while fetching vod')
}

View File

@ -1,3 +1,21 @@
<% if (!data?.user) { %>
<div class="notification is-warning">
<span class="icon-text">
<span class="icon">
<svg xmlns="http://www.w3.org/2000/svg" width="24" height="24" viewBox="0 0 24 24">
<g fill="none" fill-rule="evenodd">
<path d="m12.593 23.258l-.011.002l-.071.035l-.02.004l-.014-.004l-.071-.035q-.016-.005-.024.005l-.004.01l-.017.428l.005.02l.01.013l.104.074l.015.004l.012-.004l.104-.074l.012-.016l.004-.017l-.017-.427q-.004-.016-.017-.018m.265-.113l-.013.002l-.185.093l-.01.01l-.003.011l.018.43l.005.012l.008.007l.201.093q.019.005.029-.008l.004-.014l-.034-.614q-.005-.018-.02-.022m-.715.002a.02.02 0 0 0-.027.006l-.006.014l-.034.614q.001.018.017.024l.015-.002l.201-.093l.01-.008l.004-.011l.017-.43l-.003-.012l-.01-.01z" />
<path fill="currentColor" d="M20 3a2 2 0 0 1 1.995 1.85L22 5v14a2 2 0 0 1-1.85 1.995L20 21H4a2 2 0 0 1-1.995-1.85L2 19V5a2 2 0 0 1 1.85-1.995L4 3zm0 2H4v14h16zm-9.66 2.638l.518.23l.338.16l.387.19l.43.218l.47.25l.507.28l.266.152l.518.305l.474.292l.43.273l.38.253l.48.33l.364.263l.095.07a1.234 1.234 0 0 1 0 1.98l-.323.235l-.44.308l-.356.239l-.405.263l-.453.283l-.499.3l-.534.309l-.509.282l-.471.25l-.43.22l-.386.188l-.622.288l-.23.1a1.234 1.234 0 0 1-1.714-.99l-.058-.565l-.032-.374l-.042-.664l-.023-.508l-.015-.555l-.004-.294l-.002-.305q0-.31.006-.6l.015-.555l.023-.507l.027-.457l.03-.401l.075-.744a1.235 1.235 0 0 1 1.715-.992m.611 2.501l-.436-.218l-.029.487l-.022.551l-.013.61l-.002.325l.002.325l.013.609l.01.283l.026.52l.015.235l.434-.218l.487-.256l.535-.294l.284-.162l.551-.326l.494-.306l.436-.28l.196-.13l-.407-.27l-.466-.294a30 30 0 0 0-.803-.48l-.283-.161l-.534-.294z" />
</g>
</svg>
</span>
<span>For video playback options, please <a href="/auth/login?returnTo=<%= encodeURIComponent(request.url.href) %>">login</a></span>
</span>
</div>
<% } else { %>
<div class="players" <% if (data?.user?.get('patron')) { %> data-signals="{'selected':'cdn1'}" <% } else { %> data-signals="{'selected':'cdn2'}" <% } %>>
<!-- CDN2 Player (B2) -->
@ -20,7 +38,6 @@
<% } %>
</div>
<!-- Player toggle buttons -->
<nav class="level mt-5">
<div class="level-left">
@ -32,6 +49,7 @@
<button class="button is-success" data-on-click="$selected = 'cdn2'">CDN2 player</button>
</div>
</nav>
<% } %>
<!-- VOD Details -->
<div class="vod-details box mt-5">
@ -59,10 +77,6 @@
</p>
<% } %>
<% if (data.vod?.get('ipfsCid')) { %>
<p><b id="ipfs-cid">IPFS CID:</b> <%= data.vod?.get('ipfsCid') %></p>
<% } %>
<% if (data.vod?.get('magnetLink')) { %>
<p><b id="magnet-link">Magnet Link:</b> <%= data.vod?.get('magnetLink') %></p>
<% } %>

View File

@ -4,9 +4,7 @@
/** @type {import('pocketpages').PageDataLoaderFunc} */
module.exports = function (api) {
const { params, request, pb } = api;
module.exports = function ({ params, request, pb }, next) {
const perPage = params.perPage || 25;
const page = params.page || 1;
const vtuber = $app.findFirstRecordByData('vtubers', 'slug', params.slug);
@ -33,7 +31,7 @@ module.exports = function (api) {
// console.log(eerrs)
return { vods, vtuber };
next({ vods, vtuber });
};

View File

@ -13,7 +13,7 @@
<%# VTuber Image %>
<figure class="image is-128x128">
<img src="/api/files/vtubers/<%= data.vtuber?.id %>/<%= data.vtuber?.get('image') %>?thumb=128x128" alt="<%= data.vtuber?.get?.('displayName') || 'VTuber' %>" />
<img src="/api/files/vtubers/<%= data.vtuber?.id %>/<%= data.vtuber?.get('image') %>?height=128&width=128" alt="<%= data.vtuber?.get?.('displayName') || 'VTuber' %>" />
</figure>
</div>
<div class="column">

View File

@ -6,10 +6,10 @@
* This middleware populates pocketpages meta fields
*/
module.exports = function ({ meta, redirect, request, auth }) {
module.exports = function ({ meta, redirect, request, auth }, next) {
meta('title', 'Futureporn.net')
meta('description', 'Dedication to the preservation of Lewdtuber history')
meta('image', 'https://futureporn.net/assets/logo.png')
meta('image', 'https://futureporn.net/logo.png')
meta('version', require(`../../package.json`)?.version)
next()
}

View File

@ -10,7 +10,7 @@
let user;
if (auth) {
console.log('request.auth is present id:', auth.get('id'))
// console.log('request.auth is present id:', auth.get('id'))
user = $app.findFirstRecordByData('users', 'id', auth.id);
}
@ -23,7 +23,7 @@
}
const signals = datastar.readSignals(request, {})
console.log('signals as followssssssss', JSON.stringify(signals));
// console.log('signals as followssssssss', JSON.stringify(signals));
user.set('publicUsername', signals.publicUsername);
$app.save(user);

View File

@ -0,0 +1,25 @@
// +middleware.js
// load vods for the vod feed
/** @type {import('pocketpages').PageDataLoaderFunc} */
module.exports = function ({ params, response }, next) {
try {
const vods = $app.findRecordsByFilter('vods', null, '-streamDate', 25);
$app.expandRecords(vods, ["vtubers"], null);
next({ vods });
} catch (e) {
console.error('error!', e.message);
if (e.message.match(/no rows/)) {
console.error('we are sending 404')
return response.html(404, 'VODs not found')
} else {
console.error('we are sending error 500')
return response.html(500, 'Unknown internal error while fetching vods')
}
}
};

View File

@ -12,7 +12,7 @@ const feed = {
home_page_url: "https://futureporn.net",
feed_url: "https://futureporn.net/vods/feed.json",
description: meta('description'),
icon: "https://futureporn.net/assets/logo.png",
icon: "https://futureporn.net/logo.png",
author: {
name: "CJ_Clippy",
url: "https://futureporn.net"
@ -24,7 +24,6 @@ const feed = {
title: vod.title,
announceUrl: vod.get('announceUrl'),
id: vod.get('id'),
ipfsCid: vod.get('ipfsCid'),
magnetLink: vod.get('magnetLink'),
image: vod.get('thumbnail'),
date_modified: vod.get('updated'),

Binary file not shown.

After

Width:  |  Height:  |  Size: 928 B

View File

@ -0,0 +1,55 @@
// 2025-11-16-rm-fake-sourceVideo.ts
import PocketBase from 'pocketbase';
import { readFileSync } from 'node:fs';
import { basename, join } from 'node:path';
import spawn from 'nano-spawn';
import { tmpdir } from 'node:os';
import mime from 'mime';
const pb = new PocketBase(process.env.POCKETBASE_URL || 'http://127.0.0.1:8090');
if (!process.env.POCKETBASE_USERNAME) throw new Error('POCKETBASE_USERNAME missing');
if (!process.env.POCKETBASE_PASSWORD) throw new Error('POCKETBASE_PASSWORD missing');
interface ManifestItem {
thumbnail_url: string;
thumbnail_key: string;
tmp_file: string;
vod_id: string;
}
type Manifest = ManifestItem[];
interface Vod {
id: string;
thumbnail: string;
streamDate: string;
}
const vodsCollectionName = 'pbc_144770472';
// pbc_144770472 is the vods collection
// cv6m31vj98gmtsx is a sample vod id
async function main() {
console.log('Authenticating with PocketBase...');
await pb
.collection("_superusers")
.authWithPassword(process.env.POCKETBASE_USERNAME!, process.env.POCKETBASE_PASSWORD!);
const vods = await pb.collection('vods').getFullList({ filter: 'sourceVideo ~ "content/"' });
console.log(`${vods.length} vods.`);
for (const [i, vod] of vods.entries()) {
console.log("processing vod " + i + "/" + vods.length + " " + vod.id);
await pb.collection('vods').update(vod.id, { sourceVideo: '' });
}
console.log("All done.");
}
main()

View File

@ -8,6 +8,9 @@ async function main() {
// upload the site
await spawn('rsync', [
'-avz',
'--delete',
'--exclude=pocketbase',
'--exclude=std.log',
'--exclude=pb_data',
'--exclude=*.local',
'-e',

1
services/worker/.config/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
qBittorrent.conf

View File

@ -1,8 +1,9 @@
import { Queue as QueueMQ, Worker, type QueueOptions, type JobsOptions, type Job } from 'bullmq';
import env from './env';
console.log(`using VALKEY_PORT=${env.VALKEY_PORT}`);
export const connection: QueueOptions['connection'] = {
host: '127.0.0.1',
port: 6379,
port: Number(env.VALKEY_PORT),
password: ''
};

View File

@ -2,6 +2,8 @@
const env = (() => {
if (!process.env.POCKETBASE_URL) throw new Error('POCKETBASE_URL missing in env');
if (!process.env.PORT) throw new Error('PORT missing in env');
if (!process.env.WORKER_PORT) throw new Error('WORKER_PORT missing in env');
if (!process.env.VALKEY_PORT) throw new Error('VALKEY_PORT missing in env');
if (!process.env.POCKETBASE_USERNAME) throw new Error('POCKETBASE_USERNAME missing in env');
if (!process.env.POCKETBASE_PASSWORD) throw new Error('POCKETBASE_PASSWORD missing in env');
if (!process.env.MUX_TOKEN_ID) throw new Error('MUX_TOKEN_ID missing in env');
@ -16,11 +18,28 @@ const env = (() => {
if (!process.env.AWS_SECRET_ACCESS_KEY) throw new Error('AWS_SECRET_ACCESS_KEY missing in env');
if (!process.env.AWS_REGION) throw new Error('AWS_REGION missing in env');
if (!process.env.AWS_ENDPOINT) throw new Error('AWS_ENDPOINT missing in env');
if (!process.env.V1_AWS_BUCKET) throw new Error('V1_AWS_BUCKET missing in env');
if (!process.env.V1_AWS_ACCESS_KEY_ID) throw new Error('V1_AWS_ACCESS_KEY_ID missing in env');
if (!process.env.V1_AWS_SECRET_ACCESS_KEY) throw new Error('V1_AWS_SECRET_ACCESS_KEY missing in env');
if (!process.env.V1_AWS_REGION) throw new Error('V1_AWS_REGION missing in env');
if (!process.env.V1_AWS_ENDPOINT) throw new Error('V1_AWS_ENDPOINT missing in env');
if (!process.env.FANSLY_USERNAME) throw new Error('FANSLY_USERNAME missing in env');
if (!process.env.FANSLY_PASSWORD) throw new Error('FANSLY_PASSWORD missing in env');
if (!process.env.APIFY_TOKEN) throw new Error('APIFY_TOKEN missing in env');
if (!process.env.NODE_ENV) throw new Error('APIFY_TOKEN missing in env');
if (!process.env.CACHE_ROOT) throw new Error('CACHE_ROOT missing in env');
if (!process.env.SEEDBOX_SFTP_URL) throw new Error('SEEDBOX_SFTP_URL missing in env');
if (!process.env.SEEDBOX_SFTP_USERNAME) throw new Error('SEEDBOX_SFTP_USERNAME missing in env');
if (!process.env.SEEDBOX_SFTP_PASSWORD) throw new Error('SEEDBOX_SFTP_PASSWORD missing in env');
if (!process.env.QBT_HOST) throw new Error('QBT_HOST missing in env');
if (!process.env.QBT_PORT) throw new Error('QBT_PORT missing in env');
if (!process.env.QBT_PASSWORD) throw new Error('QBT_PASSWORD missing in env');
if (!process.env.QBT_USERNAME) throw new Error('QBT_USERNAME missing in env');
const {
PORT,
WORKER_PORT,
POCKETBASE_URL,
POCKETBASE_USERNAME,
POCKETBASE_PASSWORD,
@ -36,11 +55,28 @@ const env = (() => {
AWS_SECRET_ACCESS_KEY,
AWS_REGION,
AWS_ENDPOINT,
V1_AWS_BUCKET,
V1_AWS_ACCESS_KEY_ID,
V1_AWS_SECRET_ACCESS_KEY,
V1_AWS_REGION,
V1_AWS_ENDPOINT,
FANSLY_USERNAME,
FANSLY_PASSWORD,
APIFY_TOKEN,
NODE_ENV,
CACHE_ROOT,
SEEDBOX_SFTP_URL,
SEEDBOX_SFTP_USERNAME,
SEEDBOX_SFTP_PASSWORD,
VALKEY_PORT,
QBT_HOST,
QBT_USERNAME,
QBT_PASSWORD,
QBT_PORT,
} = process.env
return {
PORT,
WORKER_PORT,
POCKETBASE_URL,
POCKETBASE_USERNAME,
POCKETBASE_PASSWORD,
@ -56,8 +92,24 @@ const env = (() => {
AWS_SECRET_ACCESS_KEY,
AWS_REGION,
AWS_ENDPOINT,
V1_AWS_BUCKET,
V1_AWS_ACCESS_KEY_ID,
V1_AWS_SECRET_ACCESS_KEY,
V1_AWS_REGION,
V1_AWS_ENDPOINT,
FANSLY_PASSWORD,
FANSLY_USERNAME,
APIFY_TOKEN,
NODE_ENV,
CACHE_ROOT,
SEEDBOX_SFTP_URL,
SEEDBOX_SFTP_USERNAME,
SEEDBOX_SFTP_PASSWORD,
VALKEY_PORT,
QBT_HOST,
QBT_USERNAME,
QBT_PASSWORD,
QBT_PORT,
}
})()

View File

@ -0,0 +1,2 @@
{
}

View File

@ -0,0 +1,2 @@
{
}

View File

@ -0,0 +1,2 @@
{
}

View File

@ -1,29 +1,30 @@
{
"compilerOptions": {
// Environment setup & latest features
"lib": ["ESNext"],
"lib": [
"ESNext"
],
"target": "ESNext",
"module": "Preserve",
"moduleDetection": "force",
"jsx": "react-jsx",
"allowJs": true,
"types": [
"node"
],
// Bundler mode
"moduleResolution": "bundler",
"allowImportingTsExtensions": true,
"verbatimModuleSyntax": true,
"noEmit": true,
// Best practices
"strict": true,
"skipLibCheck": true,
"noFallthroughCasesInSwitch": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
// Some stricter flags (disabled by default)
"noUnusedLocals": false,
"noUnusedParameters": false,
"noPropertyAccessFromIndexSignature": false
}
}
}

View File

@ -1,3 +0,0 @@
#!/bin/bash
/home/cj/.nvm/versions/node/v22.18.0/bin/node --import tsx ./src/index.ts

View File

@ -12,6 +12,10 @@
"@mux/mux-node": "^12.8.0",
"@types/express": "^5.0.5",
"@types/js-yaml": "^4.0.9",
"@types/node": "^24.10.1",
"@types/semver": "^7.7.1",
"@types/ssh2": "^1.15.5",
"apify-client": "^2.19.0",
"bullmq": "^5.63.0",
"date-fns": "^4.1.0",
"fs-extra": "^11.3.2",
@ -23,8 +27,10 @@
"puppeteer": "^24.30.0",
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-stealth": "^2.11.2",
"semver": "^7.7.3",
"sharp": "^0.34.5",
"slugify": "^1.6.6",
"ssh2": "^1.17.0",
"which": "^5.0.0"
},
"devDependencies": {
@ -35,6 +41,32 @@
"typescript": "^5"
}
},
"node_modules/@apify/consts": {
"version": "2.47.0",
"resolved": "https://registry.npmjs.org/@apify/consts/-/consts-2.47.0.tgz",
"integrity": "sha512-VFVrNn/S3bLPrS3v9x7Td5b8YtxPbrepuXLWu27n06T34qQeogysbnAWIBNnTQ59qRKlxrAeFR4ezI+fRXTMNQ==",
"license": "Apache-2.0"
},
"node_modules/@apify/log": {
"version": "2.5.26",
"resolved": "https://registry.npmjs.org/@apify/log/-/log-2.5.26.tgz",
"integrity": "sha512-o/Ciki7UwaXwdJ+tvTg7WG8Q3C+hZXYUpo2FrTG7Upr1PQW45PGfwLDuJxteIBsEXjIHtWitlLi4AZwG7mtRMQ==",
"license": "Apache-2.0",
"dependencies": {
"@apify/consts": "^2.47.0",
"ansi-colors": "^4.1.1"
}
},
"node_modules/@apify/utilities": {
"version": "2.23.0",
"resolved": "https://registry.npmjs.org/@apify/utilities/-/utilities-2.23.0.tgz",
"integrity": "sha512-WAyenlKvtXtvd6V8D2fYwbsmc3dMn3z02JaOhZNx/p8u0NuacNgoLk/+PW4IPWrNxhqgDQZqde4cpNfZOktvsQ==",
"license": "Apache-2.0",
"dependencies": {
"@apify/consts": "^2.47.0",
"@apify/log": "^2.5.26"
}
},
"node_modules/@babel/code-frame": {
"version": "7.27.1",
"resolved": "https://registry.npmjs.org/@babel/code-frame/-/code-frame-7.27.1.tgz",
@ -91,6 +123,18 @@
"@bull-board/api": "6.14.0"
}
},
"node_modules/@crawlee/types": {
"version": "3.15.3",
"resolved": "https://registry.npmjs.org/@crawlee/types/-/types-3.15.3.tgz",
"integrity": "sha512-RvgVPXrsQw4GQIUXrC1z1aNOedUPJnZ/U/8n+jZ0fu1Iw9moJVMuiuIxSI8q1P6BA84aWZdalyfDWBZ3FMjsiw==",
"license": "Apache-2.0",
"dependencies": {
"tslib": "^2.4.0"
},
"engines": {
"node": ">=16.0.0"
}
},
"node_modules/@emnapi/runtime": {
"version": "1.7.0",
"resolved": "https://registry.npmjs.org/@emnapi/runtime/-/runtime-1.7.0.tgz",
@ -1452,6 +1496,18 @@
"win32"
]
},
"node_modules/@sindresorhus/is": {
"version": "4.6.0",
"resolved": "https://registry.npmjs.org/@sindresorhus/is/-/is-4.6.0.tgz",
"integrity": "sha512-t09vSN3MdfsyCHoFcTRCH/iUtG7OJ0CsjzB8cjAmKc/va/kIgeDI/TxsigdncE/4be734m0cvIYwNaV4i2XqAw==",
"license": "MIT",
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/sindresorhus/is?sponsor=1"
}
},
"node_modules/@standard-schema/spec": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/@standard-schema/spec/-/spec-1.0.0.tgz",
@ -1594,6 +1650,12 @@
"integrity": "sha512-hKormJbkJqzQGhziax5PItDUTMAM9uE2XXQmM37dyd4hVM+5aVl7oVxMVUiVQn2oCQFN/LKCZdvSM0pFRqbSmQ==",
"license": "MIT"
},
"node_modules/@types/semver": {
"version": "7.7.1",
"resolved": "https://registry.npmjs.org/@types/semver/-/semver-7.7.1.tgz",
"integrity": "sha512-FmgJfu+MOcQ370SD0ev7EI8TlCAfKYU+B4m5T3yXc1CiRN94g/SZPtsCkk506aUDtlMnFZvasDwHHUcZUEaYuA==",
"license": "MIT"
},
"node_modules/@types/send": {
"version": "1.2.1",
"resolved": "https://registry.npmjs.org/@types/send/-/send-1.2.1.tgz",
@ -1624,6 +1686,30 @@
"@types/node": "*"
}
},
"node_modules/@types/ssh2": {
"version": "1.15.5",
"resolved": "https://registry.npmjs.org/@types/ssh2/-/ssh2-1.15.5.tgz",
"integrity": "sha512-N1ASjp/nXH3ovBHddRJpli4ozpk6UdDYIX4RJWFa9L1YKnzdhTlVmiGHm4DZnj/jLbqZpes4aeR30EFGQtvhQQ==",
"license": "MIT",
"dependencies": {
"@types/node": "^18.11.18"
}
},
"node_modules/@types/ssh2/node_modules/@types/node": {
"version": "18.19.130",
"resolved": "https://registry.npmjs.org/@types/node/-/node-18.19.130.tgz",
"integrity": "sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==",
"license": "MIT",
"dependencies": {
"undici-types": "~5.26.4"
}
},
"node_modules/@types/ssh2/node_modules/undici-types": {
"version": "5.26.5",
"resolved": "https://registry.npmjs.org/undici-types/-/undici-types-5.26.5.tgz",
"integrity": "sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==",
"license": "MIT"
},
"node_modules/@types/yauzl": {
"version": "2.10.3",
"resolved": "https://registry.npmjs.org/@types/yauzl/-/yauzl-2.10.3.tgz",
@ -1808,6 +1894,15 @@
"node": ">= 8.0.0"
}
},
"node_modules/ansi-colors": {
"version": "4.1.3",
"resolved": "https://registry.npmjs.org/ansi-colors/-/ansi-colors-4.1.3.tgz",
"integrity": "sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==",
"license": "MIT",
"engines": {
"node": ">=6"
}
},
"node_modules/ansi-regex": {
"version": "5.0.1",
"resolved": "https://registry.npmjs.org/ansi-regex/-/ansi-regex-5.0.1.tgz",
@ -1832,6 +1927,25 @@
"url": "https://github.com/chalk/ansi-styles?sponsor=1"
}
},
"node_modules/apify-client": {
"version": "2.19.0",
"resolved": "https://registry.npmjs.org/apify-client/-/apify-client-2.19.0.tgz",
"integrity": "sha512-cDYwbygx/OplyF9MXTeb70nKwZHQQIp5OodsPeikWrh8sHmfeKWUu9jUUzeiWpHIPcwroYrP7KxA6UDSBGY3kQ==",
"license": "Apache-2.0",
"dependencies": {
"@apify/consts": "^2.42.0",
"@apify/log": "^2.2.6",
"@apify/utilities": "^2.18.0",
"@crawlee/types": "^3.3.0",
"agentkeepalive": "^4.2.1",
"async-retry": "^1.3.3",
"axios": "^1.6.7",
"content-type": "^1.0.5",
"ow": "^0.28.2",
"tslib": "^2.5.0",
"type-fest": "^4.0.0"
}
},
"node_modules/argparse": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/argparse/-/argparse-2.0.1.tgz",
@ -1847,6 +1961,15 @@
"node": ">=0.10.0"
}
},
"node_modules/asn1": {
"version": "0.2.6",
"resolved": "https://registry.npmjs.org/asn1/-/asn1-0.2.6.tgz",
"integrity": "sha512-ix/FxPn0MDjeyJ7i/yoHGFt/EX6LyNbxSEhPPXODPL+KB0VPk86UYfL0lMdy+KCnv+fmvIzySwaK5COwqVbWTQ==",
"license": "MIT",
"dependencies": {
"safer-buffer": "~2.1.0"
}
},
"node_modules/assertion-error": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-2.0.1.tgz",
@ -1875,10 +1998,30 @@
"integrity": "sha512-htCUDlxyyCLMgaM3xXg0C0LW2xqfuQ6p05pCEIsXuyQ+a1koYKTuBMzRNwmybfLgvJDMd0r1LTn4+E0Ti6C2AA==",
"license": "MIT"
},
"node_modules/async-retry": {
"version": "1.3.3",
"resolved": "https://registry.npmjs.org/async-retry/-/async-retry-1.3.3.tgz",
"integrity": "sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==",
"license": "MIT",
"dependencies": {
"retry": "0.13.1"
}
},
"node_modules/asynckit": {
"version": "0.4.0",
"license": "MIT"
},
"node_modules/axios": {
"version": "1.13.2",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.13.2.tgz",
"integrity": "sha512-VPk9ebNqPcy5lRGuSlKx752IlDatOjT9paPlm8A7yOuW2Fbvp4X3JznJtT4f0GzGLLiWE9W8onz51SqLYwzGaA==",
"license": "MIT",
"dependencies": {
"follow-redirects": "^1.15.6",
"form-data": "^4.0.4",
"proxy-from-env": "^1.1.0"
}
},
"node_modules/b4a": {
"version": "1.7.3",
"resolved": "https://registry.npmjs.org/b4a/-/b4a-1.7.3.tgz",
@ -1999,6 +2142,15 @@
"node": ">=10.0.0"
}
},
"node_modules/bcrypt-pbkdf": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.2.tgz",
"integrity": "sha512-qeFIXtP4MSoi6NLqO12WfqARWWuCKi2Rn/9hJLEmtB5yTNr9DqFWkJRCf2qShWzPeAMRnOgCrq0sg/KLv5ES9w==",
"license": "BSD-3-Clause",
"dependencies": {
"tweetnacl": "^0.14.3"
}
},
"node_modules/body-parser": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/body-parser/-/body-parser-2.2.0.tgz",
@ -2037,6 +2189,15 @@
"node": "*"
}
},
"node_modules/buildcheck": {
"version": "0.0.6",
"resolved": "https://registry.npmjs.org/buildcheck/-/buildcheck-0.0.6.tgz",
"integrity": "sha512-8f9ZJCUXyT1M35Jx7MkBgmBMo3oHTTBIPLiY9xyL0pl3T5RwcPEY8cUHr5LBNfu/fk6c2T4DJZuVM/8ZZT2D2A==",
"optional": true,
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/bullmq": {
"version": "5.63.0",
"license": "MIT",
@ -2254,6 +2415,20 @@
}
}
},
"node_modules/cpu-features": {
"version": "0.0.10",
"resolved": "https://registry.npmjs.org/cpu-features/-/cpu-features-0.0.10.tgz",
"integrity": "sha512-9IkYqtX3YHPCzoVg1Py+o9057a3i0fp7S530UWokCSaFVTc7CwXPRiOjRjBQQ18ZCNafx78YfnG+HALxtVmOGA==",
"hasInstallScript": true,
"optional": true,
"dependencies": {
"buildcheck": "~0.0.6",
"nan": "^2.19.0"
},
"engines": {
"node": ">=10.0.0"
}
},
"node_modules/cron-parser": {
"version": "4.9.0",
"license": "MIT",
@ -2357,6 +2532,21 @@
"integrity": "sha512-vhE6eymDQSKWUXwwA37NtTTVEzjtGVfDr3pRbsWEQ5onH/Snp2c+2xZHWJJawG/0hCCJLRGt4xVtEVUVILol4w==",
"license": "BSD-3-Clause"
},
"node_modules/dot-prop": {
"version": "6.0.1",
"resolved": "https://registry.npmjs.org/dot-prop/-/dot-prop-6.0.1.tgz",
"integrity": "sha512-tE7ztYzXHIeyvc7N+hR3oi7FIbf/NIjVP9hmAt3yMXzrQ072/fpjGLx2GxNxGxUl5V73MEqYzioOMoVhGMJ5cA==",
"license": "MIT",
"dependencies": {
"is-obj": "^2.0.0"
},
"engines": {
"node": ">=10"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/dunder-proto": {
"version": "1.0.1",
"license": "MIT",
@ -2778,6 +2968,26 @@
"integrity": "sha512-MI1qs7Lo4Syw0EOzUl0xjs2lsoeqFku44KpngfIduHBYvzm8h2+7K8YMQh1JtVVVrUvhLpNwqVi4DERegUJhPQ==",
"license": "Apache-2.0"
},
"node_modules/follow-redirects": {
"version": "1.15.11",
"resolved": "https://registry.npmjs.org/follow-redirects/-/follow-redirects-1.15.11.tgz",
"integrity": "sha512-deG2P0JfjrTxl50XGCDyfI97ZGVCxIpfKYmfyrQ54n5FO/0gfIES8C/Psl6kWVDolizcaaxZJnTS0QSMxvnsBQ==",
"funding": [
{
"type": "individual",
"url": "https://github.com/sponsors/RubenVerborgh"
}
],
"license": "MIT",
"engines": {
"node": ">=4.0"
},
"peerDependenciesMeta": {
"debug": {
"optional": true
}
}
},
"node_modules/for-in": {
"version": "1.0.2",
"resolved": "https://registry.npmjs.org/for-in/-/for-in-1.0.2.tgz",
@ -3243,6 +3453,15 @@
"node": ">=8"
}
},
"node_modules/is-obj": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/is-obj/-/is-obj-2.0.0.tgz",
"integrity": "sha512-drqDG3cbczxxEJRoOXcOjtdp1J/lyp1mNn0xaznRs8+muBhgQcrnbspox5X5fOw0HnMnbfDzvnEMEtqDEJEo8w==",
"license": "MIT",
"engines": {
"node": ">=8"
}
},
"node_modules/is-plain-object": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/is-plain-object/-/is-plain-object-2.0.4.tgz",
@ -3380,6 +3599,13 @@
"version": "3.1.0",
"license": "MIT"
},
"node_modules/lodash.isequal": {
"version": "4.5.0",
"resolved": "https://registry.npmjs.org/lodash.isequal/-/lodash.isequal-4.5.0.tgz",
"integrity": "sha512-pDo3lu8Jhfjqls6GkMgpahsF9kCyayhgykjyLMNFTKWrpVdAQtYyB4muAMWozBB4ig/dtWAmsMxLEI8wuz+DYQ==",
"deprecated": "This package is deprecated. Use require('node:util').isDeepStrictEqual instead.",
"license": "MIT"
},
"node_modules/long": {
"version": "5.3.2",
"resolved": "https://registry.npmjs.org/long/-/long-5.3.2.tgz",
@ -3542,6 +3768,13 @@
"@msgpackr-extract/msgpackr-extract-win32-x64": "3.0.3"
}
},
"node_modules/nan": {
"version": "2.23.1",
"resolved": "https://registry.npmjs.org/nan/-/nan-2.23.1.tgz",
"integrity": "sha512-r7bBUGKzlqk8oPBDYxt6Z0aEdF1G1rwlMcLk8LCOMbOzf0mG+JUfUzG4fIMWwHWP0iyaLWEQZJmtB7nOHEm/qw==",
"license": "MIT",
"optional": true
},
"node_modules/nano-spawn": {
"version": "2.0.0",
"resolved": "https://registry.npmjs.org/nano-spawn/-/nano-spawn-2.0.0.tgz",
@ -3695,6 +3928,25 @@
"protobufjs": "^7.2.4"
}
},
"node_modules/ow": {
"version": "0.28.2",
"resolved": "https://registry.npmjs.org/ow/-/ow-0.28.2.tgz",
"integrity": "sha512-dD4UpyBh/9m4X2NVjA+73/ZPBRF+uF4zIMFvvQsabMiEK8x41L3rQ8EENOi35kyyoaJwNxEeJcP6Fj1H4U409Q==",
"license": "MIT",
"dependencies": {
"@sindresorhus/is": "^4.2.0",
"callsites": "^3.1.0",
"dot-prop": "^6.0.1",
"lodash.isequal": "^4.5.0",
"vali-date": "^1.0.0"
},
"engines": {
"node": ">=12"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/pac-proxy-agent": {
"version": "7.2.0",
"resolved": "https://registry.npmjs.org/pac-proxy-agent/-/pac-proxy-agent-7.2.0.tgz",
@ -4256,6 +4508,15 @@
"url": "https://github.com/privatenumber/resolve-pkg-maps?sponsor=1"
}
},
"node_modules/retry": {
"version": "0.13.1",
"resolved": "https://registry.npmjs.org/retry/-/retry-0.13.1.tgz",
"integrity": "sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==",
"license": "MIT",
"engines": {
"node": ">= 4"
}
},
"node_modules/rimraf": {
"version": "3.0.2",
"resolved": "https://registry.npmjs.org/rimraf/-/rimraf-3.0.2.tgz",
@ -4358,6 +4619,8 @@
},
"node_modules/semver": {
"version": "7.7.3",
"resolved": "https://registry.npmjs.org/semver/-/semver-7.7.3.tgz",
"integrity": "sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==",
"license": "ISC",
"bin": {
"semver": "bin/semver.js"
@ -4656,6 +4919,23 @@
"node": ">=0.10.0"
}
},
"node_modules/ssh2": {
"version": "1.17.0",
"resolved": "https://registry.npmjs.org/ssh2/-/ssh2-1.17.0.tgz",
"integrity": "sha512-wPldCk3asibAjQ/kziWQQt1Wh3PgDFpC0XpwclzKcdT1vql6KeYxf5LIt4nlFkUeR8WuphYMKqUA56X4rjbfgQ==",
"hasInstallScript": true,
"dependencies": {
"asn1": "^0.2.6",
"bcrypt-pbkdf": "^1.0.2"
},
"engines": {
"node": ">=10.16.0"
},
"optionalDependencies": {
"cpu-features": "~0.0.10",
"nan": "^2.23.0"
}
},
"node_modules/stackback": {
"version": "0.0.2",
"resolved": "https://registry.npmjs.org/stackback/-/stackback-0.0.2.tgz",
@ -4832,6 +5112,24 @@
"fsevents": "~2.3.3"
}
},
"node_modules/tweetnacl": {
"version": "0.14.5",
"resolved": "https://registry.npmjs.org/tweetnacl/-/tweetnacl-0.14.5.tgz",
"integrity": "sha512-KXXFFdAbFXY4geFIwoyNK+f5Z1b7swfXABfL7HXCmoIWMKU3dmS26672A4EeQtDzLKy7SXmfBu51JolvEKwtGA==",
"license": "Unlicense"
},
"node_modules/type-fest": {
"version": "4.41.0",
"resolved": "https://registry.npmjs.org/type-fest/-/type-fest-4.41.0.tgz",
"integrity": "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA==",
"license": "(MIT OR CC0-1.0)",
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/sponsors/sindresorhus"
}
},
"node_modules/type-is": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/type-is/-/type-is-2.0.1.tgz",
@ -4920,6 +5218,15 @@
"uuid": "dist/esm/bin/uuid"
}
},
"node_modules/vali-date": {
"version": "1.0.0",
"resolved": "https://registry.npmjs.org/vali-date/-/vali-date-1.0.0.tgz",
"integrity": "sha512-sgECfZthyaCKW10N0fm27cg8HYTFK5qMWgypqkXMQ4Wbl/zZKx7xZICgcoxIIE+WFAP/MBL2EFwC/YvLxw3Zeg==",
"license": "MIT",
"engines": {
"node": ">=0.10.0"
}
},
"node_modules/vary": {
"version": "1.1.2",
"resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz",

View File

@ -12,6 +12,10 @@
"@mux/mux-node": "^12.8.0",
"@types/express": "^5.0.5",
"@types/js-yaml": "^4.0.9",
"@types/node": "^24.10.1",
"@types/semver": "^7.7.1",
"@types/ssh2": "^1.15.5",
"apify-client": "^2.19.0",
"bullmq": "^5.63.0",
"date-fns": "^4.1.0",
"fs-extra": "^11.3.2",
@ -23,8 +27,10 @@
"puppeteer": "^24.30.0",
"puppeteer-extra": "^3.3.6",
"puppeteer-extra-plugin-stealth": "^2.11.2",
"semver": "^7.7.3",
"sharp": "^0.34.5",
"slugify": "^1.6.6",
"ssh2": "^1.17.0",
"which": "^5.0.0"
},
"devDependencies": {

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

@ -7,7 +7,7 @@ import { generalQueue } from './queues/generalQueue.ts';
import { gpuQueue } from './queues/gpuQueue.ts';
import { highPriorityQueue } from './queues/highPriorityQueue.ts';
import env from '../.config/env.ts';
import { version } from '../package.json';
const run = async () => {
@ -34,12 +34,52 @@ const run = async () => {
await import('./workers/generalWorker.ts');
await import('./workers/gpuWorker.ts');
} else {
// @todo I separated these so that they can be ran on multiple machines
// @todo I separated these so that they can be ran on multiple machines.
// @todo we should activate these based on environment variable feature flags, not NODE_ENV
await import('./workers/highPriorityWorker.ts');
await import('./workers/generalWorker.ts');
// await import('./workers/gpuWorker.ts'); // @todo implement
}
app.get('/', (req: Request, res: Response) => {
res.send(`
<style>
html {
color-scheme: light dark;
}
body {
font-family: system-ui;
font-size: 1.25rem;
line-height: 1.5;
}
img,
svg,
video {
max-width: 100%;
display: block;
}
main {
max-width: min(70ch, 100% - 4rem);
margin-inline: auto;
}
</style>
<h1>FP Worker version ${version}.</h1>
<h2>Actions</h2>
<p><i>If you find yourself clicking one of these links a lot, consider automating it by putting the job in a queue on a every or pattern timer.</i></p>
<ul>
<li><a href="/ui">Bull Dashboard</a></li>
<li><a href="/task?name=presignMuxAssets">Task: presignMuxAssets</a></li>
<li><a href="/task?name=copyV1VideoAll">Task: copyV1VideoAll</a></li>
<li><a href="/task?name=createTorrent&vodId=1234">Task: createTorrent</a></li>
<li><a href="/task?name=createMuxAsset&vodId=">Task: createMuxAsset</a></li>
</ul>
`)
})
app.use('/ui', serverAdapter.getRouter());
app.get('/task', async (req: Request, res: Response) => {
@ -51,26 +91,23 @@ const run = async () => {
const data = { vodId };
switch (name) {
case 'presignMuxAssets':
case 'syncronizePatreon':
case 'analyzeAudio':
await generalQueue.add(name, data);
break;
case '':
throw new Error('job name was missing');
case 'scheduleVodProcessing':
await gpuQueue.add(name, data);
break;
default:
throw new Error(`Unknown job name: ${name}`);
await highPriorityQueue.add(name, data);
break;
}
res.json({ ok: true });
});
app.listen(env.PORT, () => {
console.log(`Bull Dashboard running at http://localhost:${env.PORT}/ui`);
console.log('To populate the queue, run ex:');
console.log(` curl http://localhost:${env.PORT}/task?name=presignMuxAssets`);
app.listen(env.WORKER_PORT, () => {
console.log(`Bull Dashboard running at http://localhost:${env.WORKER_PORT}/ui`);
console.log(`Actions Menu: http://localhost:${env.WORKER_PORT}`);
});

View File

@ -0,0 +1,64 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import Client from "pocketbase";
import { Vod } from "../types";
const foo = 'bar';
/**
* barFunction
*
* the function that does the actual work. exported so it can be tested by vitest
*/
export async function barFunction(job: Job, vod: Vod) {
job.log('doing the task');
// @todo insert real logic here
return `${foo}-${vod.id}`;
}
/**
* getApplicableVods
*
* gets the vods from the db using filters which apply to this task
*/
async function getApplicableVods(pb: Client) {
const results = await pb.collection('vods').getList(1, 3, {
filter: "videoSrcB2 != '' && magnetLink = ''"
})
const vods = results.items;
return vods;
}
/**
*
* aTemplate
*
* A template to copy for when we make new processors. Shows the general scaffolding.
*
* Remember to makes processors
* * idempotent
* * fail fast
* * DRY
*/
export async function aTemplate(job: Job) {
const pb = await getPocketBaseClient();
const vods = await getApplicableVods(pb);
job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`)
for (let i = 0; i < vods.length; i++) {
const vod = vods[i] as unknown as Vod;
const newlyGeneratedData = barFunction(job, vod);
await pb.collection('vods').update(vod.id, {
newlyGeneratedData
})
// Optionally update progress for bull UI
const progress = Math.round(((i + 1) / vods.length) * 100);
await job.updateProgress(progress);
}
}

View File

@ -80,7 +80,7 @@ function parseEbur128(output: string): AudioStats {
}
async function analyzeAudio(inputFile: string): Promise<AudioStats> {
async function __analyzeAudio(inputFile: string): Promise<AudioStats> {
const args = [
"-hide_banner",
"-i", inputFile,
@ -98,7 +98,7 @@ function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId");
}
export default async function main(job: Job) {
export async function analyzeAudio(job: Job) {
// job.log('TIME TO ANALYZING AUDIO');
const payload = job.data;
assertPayload(payload);
@ -122,7 +122,7 @@ export default async function main(job: Job) {
await b2Download(vod.sourceVideo, videoFilePath);
const results = await analyzeAudio(videoFilePath);
const results = await __analyzeAudio(videoFilePath);
job.log(`results=${JSON.stringify(results)}`);
await vod.collection('vods').update(vodId, {

View File

@ -0,0 +1,45 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import Client from "pocketbase";
import { Vod } from "../types";
import { generalQueue } from '../queues/generalQueue.ts';
/**
* getApplicableVods
*
* gets the vods that are missing sourceVideo
*/
async function getApplicableVods(pb: Client) {
const results = await pb
.collection('vods')
.getFullList({ filter: "videoSrcB2 != '' && sourceVideo = ''" });
const vods = results;
return vods;
}
/**
*
* copyV1VideoAll
*
* Copy all vod.videoSrcB2 from V1_AWS_BUCKET to vod.sourceVideo in AWS_BUCKET
*/
export async function copyV1VideoAll(job: Job) {
const pb = await getPocketBaseClient();
const vods = await getApplicableVods(pb);
job.log(`copyV1VideoAll found ${vods.length} elligible vods.`)
for (let i = 0; i < vods.length; i++) {
const vod = vods[i] as unknown as Vod;
await generalQueue.add('copyV1VideoToV3', {
vodId: vod.id
})
}
return { jobsQueued: vods.length }
}

View File

@ -0,0 +1,37 @@
import { RecordModel } from "pocketbase";
import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails";
import { test, expect, describe, vi } from 'vitest';
import { Job } from "bullmq";
import { copyBetweenBuckets } from "./copyV1VideoToV3";
describe('copyV1VideoToV3 integration', () => {
test("copyBetweenBuckets", async () => {
const dummyJob = {
name: 'test-copy-v1-video-to-v3',
log: vi.fn((arg) => {
console.log('dummyJob.log:', arg);
return;
}),
updateProgress: vi.fn().mockResolvedValue(undefined),
} as unknown as Job;
const dummyVod: RecordModel = {
id: '1234',
collectionId: 'qwertyuiop',
collectionName: 'vods',
videoSource: '',
videoSrcB2: 'https://futureporn-b2.b-cdn.net/projektmelody-fansly-2025-11-12.mp4',
}
const output = await copyBetweenBuckets(dummyJob, dummyVod);
expect(output).toBe('qwertyuiop/1234/projektmelody-fansly-2025-11-12.mp4');
expect(dummyJob.log).toHaveBeenCalled();
}, 1000 * 60 * 10);
})

View File

@ -0,0 +1,79 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import Client, { RecordModel } from "pocketbase";
import { Vod } from "../types";
import { basename } from 'node:path';
import env from "../../.config/env";
import spawn from 'nano-spawn';
const foo = 'bar';
/**
* barFunction
*
* the function that does the actual work. exported so it can be tested by vitest
*/
export async function copyBetweenBuckets(job: Job, vod: RecordModel) {
const s3KeyTarget = `${vod.collectionId}/${vod.id}/${basename(vod.videoSrcB2)}`;
const from = `b2://${env.V1_AWS_BUCKET}/${vod.videoSrcB2.split('/').at(-1)}`;
const toPrefix = `b2://${env.AWS_BUCKET}`;
const to = `${toPrefix}/${s3KeyTarget}`;
job.log(`Copying ${from} ${to}. This is a slow process, please be patient.`);
// @see https://github.com/sindresorhus/nano-spawn?tab=readme-ov-file#subprocesssymbolasynciterator
// unfortunately `b2 file server-side-copy` doesn't have any progress indicator in stdout or stderr.
// so we just have to fire and forget (and wait)
const subprocess = spawn('b2', ['file', 'server-side-copy', from, to]);
await Promise.all([
(async () => {
for await (const line of subprocess.stdout) {
await job.log(`stdout: ${line}`);
}
})(),
(async () => {
for await (const line of subprocess.stderr) {
await job.log(`stderr: ${line}`);
}
})(),
]);
return to.replace(toPrefix + '/', '');
}
/**
*
* copyV1VideoToV3
*
* Copy vod.videoSrcB2 from V1 S3 bucket to V3 S3 bucket
*
* Remember to makes processors
* * idempotent
* * fail fast
* * DRY
*/
export async function copyV1VideoToV3(job: Job) {
const vodId = job.data?.vodId;
if (!vodId) throw new Error('vodId was missing from input data');
const pb = await getPocketBaseClient();
const vod = await pb.collection('vods').getOne(job.data.vodId)
const sourceVideo = await copyBetweenBuckets(job, vod);
await pb.collection('vods').update(vod.id, {
sourceVideo
});
return {
vodId,
sourceVideo
}
}

View File

@ -0,0 +1,25 @@
import { type Job } from 'bullmq';
import { getPocketBaseClient } from '../util/pocketbase';
export async function copyV2ThumbToV3(job: Job) {
const pb = await getPocketBaseClient();
job.log(`the job '${job.name}' is running`);
const vods = await pb.collection('vods').getFullList({
filter: 'thumbnail ~ ".png"',
sort: '-streamDate',
});
// console.log('vods as follows', JSON.stringify(vods));
// console.log(vods[0]);
return { complete: 'kindasorta' };
}

View File

@ -0,0 +1,16 @@
import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails";
import { test, expect, describe } from 'vitest';
describe('createMagnetLink integration', () => {
test("", async () => {
// for (const datum of data) {
// expect(datum.date).toBeTruthy();
// expect(new Date(datum.date)).toStrictEqual(new Date(datum.expectedDate));
// }
}, 30000);
})

View File

@ -0,0 +1,74 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import type { Vod } from "../types";
const TWEET_URL_REGEX =
/^https?:\/\/(twitter\.com|x\.com)\/[A-Za-z0-9_]{1,15}\/status\/(\d+)(\?.*)?$/;
export function tweetIdToDate(id: string | number): Date {
const snowflake = BigInt(id);
const timestamp = (snowflake >> 22n) + 1288834974657n;
return new Date(Number(timestamp));
}
export function getTweetId(url: string) {
if (!TWEET_URL_REGEX.test(url)) {
throw new Error(`Invalid tweet URL: ${url}`);
}
return url.split('/').at(-1)!;
}
export function getTweetDates(tweetUrls: string[]): Date[] {
tweetUrls.forEach((url) => {
if (!TWEET_URL_REGEX.test(url)) {
throw new Error(`Invalid tweet URL: ${url}`);
}
});
return tweetUrls
.map(url => url.split('/').at(-1)!) // add ! if youre sure it exists
.map(tweetID => tweetIdToDate(tweetID));
}
export async function getApplicableVods() {
const pb = await getPocketBaseClient()
const results = await pb.collection('vods').getList(1, 3, {
filter: "videoSrcB2 != '' && magnetLink = ''"
})
const vods = results.items;
return vods;
}
/**
*
* createMagnetLink
*
* given a vod, scrape the tweet and populate the streamDate
*/
export async function createMagnetLinks(job: Job) {
const pb = await getPocketBaseClient();
const vods = await getApplicableVods();
job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`)
for (let i = 0; i < vods.length; i++) {
const vod = vods[i];
const magnetLink = await createMagnetLink(vod.videoSrcB2);
await pb.collection('vods').update(vod.id, {
magnetLink
})
const progress = Math.round(((i + 1) / vods.length) * 100);
await job.updateProgress(progress);
}
}
export async function createMagnetLink(vod: Vod) {
}

View File

@ -0,0 +1,86 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import Client, { RecordModel } from "pocketbase";
import { Vod } from "../types";
interface MuxAssetCreationResponse {
data: {
video_quality: string;
status: string;
progress: {
state: string;
progress: number;
};
playback_ids: Array<{
policy: string;
id: string;
}>;
mp4_support: string;
max_resolution_tier: string;
master_access: string;
ingest_type: string;
id: string;
encoding_tier: string;
created_at: string;
};
}
/**
*
* create a mux asset on Mux.com
*/
export async function __createMuxAsset(vod: RecordModel) {
const { videoSrcB2, muxAssetId, muxPlaybackId } = vod;
if (!videoSrcB2) throw new Error(`vod ${vod.id} was missing videoSrcB2`);
if (muxAssetId !== '') throw new Error('this vod already has a muxAssetId');
if (muxPlaybackId !== '') throw new Error('this vod already has a muxPlaybackId');
const res = await fetch('https://api.mux.com/video/v1/assets', {
body: JSON.stringify({
"input": videoSrcB2,
"playback_policy": [
"signed"
]
})
})
if (!res.ok) {
const body = await res.text();
throw new Error(`Mux API error: ${res.status} ${res.statusText} ${body}`);
}
const payload = await res.json() as unknown as MuxAssetCreationResponse;
if (!payload.data.status) {
throw new Error('invalid response data received from Mux--' + JSON.stringify(payload));
}
const playbackIdEntry = payload.data.playback_ids.find((id) => id.policy === 'signed');
if (!playbackIdEntry) throw new Error('failed to find a playback_id with signed policy');
const playbackId = playbackIdEntry.id;
const assetId = payload.data.id;
return { playbackId, assetId };
}
export async function createMuxAsset(job: Job) {
const pb = await getPocketBaseClient();
const vodId = job.data?.vodId;
if (!vodId) throw new Error('vodId is missing in the job data.');
const vod = await pb.collection('vods').getOne(vodId);
job.log(`createMuxAsset for vod ${vodId}.`)
const { assetId, playbackId } = (await __createMuxAsset(vod));
job.log(`Created assetId=${assetId}, playbackId=${playbackId}`);
pb.collection('vods').update(vodId, { muxAssetId: assetId, muxPlaybackId: playbackId });
job.log('Vod record updated. All done.');
}

View File

@ -20,21 +20,18 @@
import type { Helpers } from "graphile-worker";
import { PrismaClient } from "../../generated/prisma";
import { withAccelerate } from "@prisma/extension-accelerate";
import { getOrDownloadAsset } from "../utils/cache";
import { env } from "../config/env";
import { getS3Client, uploadFile } from "../utils/s3";
import env from "../../.config/env";
import { sshClient } from "../util/sftp";
import { qbtClient, QBTorrentInfo } from "../util/qbittorrent";
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import spawn from "nano-spawn";
import { join, basename } from 'node:path';
import { tmpdir } from "node:os";
import { nanoid } from "nanoid";
import { getNanoSpawn } from "../utils/nanoSpawn";
import logger from "../utils/logger";
import { basename, join } from "node:path";
import { generateS3Path } from "../utils/formatters";
import { sshClient } from "../utils/sftp";
import { qbtClient } from "../utils/qbittorrent/qbittorrent";
import { formatDate } from "date-fns";
const prisma = new PrismaClient().$extends(withAccelerate());
interface Payload {
@ -55,7 +52,7 @@ function assertPayload(payload: any): asserts payload is Payload {
// const spawn = await getNanoSpawn()
// const torrentFilePath = join(env.CACHE_ROOT, `${nanoid()}.torrent`);
// logger.debug('creating torrent & magnet link via imdl');
// job.log('creating torrent & magnet link via imdl');
// const result = await spawn('imdl', [
// 'torrent',
@ -78,7 +75,7 @@ function assertPayload(payload: any): asserts payload is Payload {
// }
// const magnetLink = match[0];
// logger.debug(`Magnet link=${magnetLink}`);
// job.log(`Magnet link=${magnetLink}`);
// return {
// magnetLink,
@ -92,10 +89,10 @@ async function createQBittorrentTorrent(
videoFilePath: string
): Promise<{
magnetLink: string,
torrentFilePath: string
torrentFilePath: string,
info: QBTorrentInfo,
}> {
const torrentInfo = await qbtClient.createTorrent(videoFilePath);
return torrentInfo
return qbtClient.createTorrent(videoFilePath);
}
// async function createTorrentfileTorrent(
@ -117,7 +114,7 @@ async function createQBittorrentTorrent(
// const torrentFilePath = join(env.CACHE_ROOT, `${nanoid()}.torrent`);
// logger.debug('creating torrent & magnet link')
// job.log('creating torrent & magnet link')
// const result = await spawn('torrentfile', [
// 'create',
// '--magnet',
@ -140,7 +137,7 @@ async function createQBittorrentTorrent(
// }
// const magnetLink = match[0];
// logger.debug(`Magnet link=${magnetLink}`);
// job.log(`Magnet link=${magnetLink}`);
// return {
// magnetLink,
@ -150,30 +147,30 @@ async function createQBittorrentTorrent(
// }
async function uploadTorrentToSeedbox(videoFilePath: string, torrentFilePath: string) {
async function uploadTorrentToSeedbox(job: Job, videoFilePath: string, torrentFilePath: string) {
job.log(`Uploading ${videoFilePath} to seedbox...`);
await sshClient.uploadFile(videoFilePath, './data');
job.log(`Uploading ${torrentFilePath} to seedbox...`);
await sshClient.uploadFile(torrentFilePath, './watch');
}
export default async function main(payload: any, helpers: Helpers) {
export async function createTorrent(job: Job) {
const payload = job.data
assertPayload(payload)
const { vodId } = payload
const vod = await prisma.vod.findFirstOrThrow({
where: {
id: vodId
}
})
const pb = await getPocketBaseClient();
const vod = await pb.collection('vods').getOne(vodId, { expand: 'vtubers' });
// const spawn = await getNanoSpawn();
// * [x] load vod
// * [x] exit if video.thumbnail already defined
// * [x] exit if video.magnetLink already defined
if (vod.magnetLink) {
logger.info(`Doing nothing-- vod ${vodId} already has a magnet link.`)
job.log(`Doing nothing-- vod ${vodId} already has a magnet link.`)
return; // Exit the function early
}
@ -182,25 +179,29 @@ export default async function main(payload: any, helpers: Helpers) {
}
logger.info('Creating torrent.')
const s3Client = getS3Client()
job.log('Creating torrent.');
// we gotta put the download in a place that qbittorrent docker container can access it
const formattedDate = formatDate(vod.streamDate, 'yyyy-MM-dd');
const vtubers = vod?.expand?.vtubers?.map((vt: { displayName: string, slug: string }) => vt.slug).join('-');
const videoFilePath = join(tmpdir(), `${formattedDate}-${vtubers}-${vod.id}.mp4`);
// * [x] download video segments from pull-thru cache
const videoFilePath = await getOrDownloadAsset(s3Client, env.S3_BUCKET, vod.sourceVideo)
logger.debug(`videoFilePath=${videoFilePath}`)
const dlFrom = `b2://${env.AWS_BUCKET}/${vod.sourceVideo}`;
job.log(`downloading ${dlFrom} to ${videoFilePath}`);
await spawn('b2', ['file', 'download', dlFrom, videoFilePath]);
const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath)
const { magnetLink, torrentFilePath } = await createQBittorrentTorrent(vodId, videoFilePath);
await uploadTorrentToSeedbox(videoFilePath, torrentFilePath)
await uploadTorrentToSeedbox(job, videoFilePath, torrentFilePath);
logger.debug(`updating vod record`);
await prisma.vod.update({
where: { id: vodId },
data: { magnetLink }
job.log(`updating vod record`);
await pb.collection('vods').update(vod.id, {
magnetLink
});
logger.info(`🏆 torrent creation complete.`)
job.log(`🏆 torrent creation complete.`);
}

View File

@ -1,30 +1,55 @@
import type { Task, Helpers } from "graphile-worker";
import { PrismaClient } from "../../generated/prisma";
import { withAccelerate } from "@prisma/extension-accelerate";
import logger from "../utils/logger";
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
import Client from "pocketbase";
import { generalQueue } from "../queues/generalQueue";
const prisma = new PrismaClient().$extends(withAccelerate());
const findWork: Task = async (_payload, helpers: Helpers) => {
logger.info(`findWork begin.`);
export async function findMissingTorrent(job: Job, pb: Client) {
const approvedUploads = await prisma.vod.findMany({
where: {
OR: [
{ status: "approved" },
{ status: "pending" },
]
},
const results = await pb.collection('vods').getList(1, 1, {
filter: "videoSrcB2 != '' && magnetLink = ''",
sort: '-streamDate'
});
const vods = results.items;
const vod = vods[0];
logger.info(`findWork found ${approvedUploads.length} uploads.`);
for (let i = 0; i < approvedUploads.length; i++) {
const vod = approvedUploads[i];
await helpers.addJob("scheduleVodProcessing", { vodId: vod.id });
logger.info(`scheduleVodProcessing for vod ${vod.id}`);
job.log(`findWork found ${vod.id} in need of a torrent.`);
const jobId = `createTorrent-${vod.id}`;
const existingJob = await generalQueue.getJob(jobId);
// only add a createTorrent job if there isn't one queued
if (existingJob?.isActive) {
job.log(`refusing to add createTorrent job for vod ${vod.id} because a similar job is active`);
} else {
await generalQueue.add('createTorrent', {
vodId: vod.id
}, {
jobId
});
}
}
logger.info(`findWork finished.`);
};
export default findWork;
/**
*
* findWork
*
* Remember to makes processors
* * idempotent
* * fail fast
* * DRY
*/
export async function findWork(job: Job) {
const pb = await getPocketBaseClient();
await findMissingTorrent(job, pb);
// findMissingThumbnail
// findMissingAudioAnalysis
// ... etc.
}

View File

@ -0,0 +1,34 @@
import { getTweetDates, getTweetId, tweetIdToDate } from "./getAnnounceUrlDetails";
import { test, expect, describe } from 'vitest';
describe('getAnnounceUrlDetails integration', () => {
test("getTweets", async () => {
const testData = [
{
announceUrl: 'https://x.com/ProjektMelody/status/1989881851817988146',
listedDate: '2025-11-16T02:22:58.000Z',
expectedDate: '2025-11-16T02:22:58.347Z'
}, {
announceUrl: 'https://x.com/ProjektMelody/status/1266096281002356736',
listedDate: '2020-05-28T19:57:30.000Z',
expectedDate: '2020-05-28T19:57:30.979Z',
}, {
announceUrl: 'https://x.com/ProjektMelody/status/1481416541493153795',
listedDate: '2022-01-13T00:03:21.000Z',
expectedDate: '2022-01-13T00:03:21.537Z',
}
]
const data = testData.map((datum) => ({ ...datum, date: tweetIdToDate(getTweetId(datum.announceUrl)) }))
for (const datum of data) {
expect(datum.date).toBeTruthy();
expect(new Date(datum.date)).toStrictEqual(new Date(datum.expectedDate));
}
}, 30000);
})

View File

@ -0,0 +1,74 @@
import { Job } from "bullmq";
import { getPocketBaseClient } from "../util/pocketbase";
const TWEET_URL_REGEX =
/^https?:\/\/(twitter\.com|x\.com)\/[A-Za-z0-9_]{1,15}\/status\/(\d+)(\?.*)?$/;
export function tweetIdToDate(id: string | number): Date {
const snowflake = BigInt(id);
const timestamp = (snowflake >> 22n) + 1288834974657n;
return new Date(Number(timestamp));
}
export function getTweetId(url: string) {
if (!TWEET_URL_REGEX.test(url)) {
throw new Error(`Invalid tweet URL: ${url}`);
}
return url.split('/').at(-1)!;
}
export function getTweetDates(tweetUrls: string[]): Date[] {
tweetUrls.forEach((url) => {
if (!TWEET_URL_REGEX.test(url)) {
throw new Error(`Invalid tweet URL: ${url}`);
}
});
return tweetUrls
.map(url => url.split('/').at(-1)!) // add ! if youre sure it exists
.map(tweetID => tweetIdToDate(tweetID));
}
export async function getVodsWithAnnounceUrlAndNoStreamDate() {
const pb = await getPocketBaseClient()
const results = await pb.collection('vods').getList(1, 25, {
filter: "announceUrl != '' && streamDate = ''"
})
const vods = results.items;
return vods;
}
/**
*
* getAnnounceUrlDetails
*
* given a vod.announceUrl, get the tweet timestamp and populate the streamDate
*/
export async function getAnnounceUrlDetails(job: Job) {
const pb = await getPocketBaseClient();
const vods = await getVodsWithAnnounceUrlAndNoStreamDate();
job.log(`getAnnounceUrlDetails found ${vods.length} vods in need of a streamDate.`)
for (let i = 0; i < vods.length; i++) {
const vod = vods[i];
job.log(`processing vod ${vod.id}`);
const announceUrl = vod.announceUrl;
job.log(`announceUrl is ${vod.announceUrl}`);
job.log(`getting streamDate from tweet`);
const streamDate = tweetIdToDate(getTweetId(announceUrl));
job.log(`streamDate is ${streamDate}`);
job.log('updating vod ' + vod.id);
await pb.collection('vods').update(vod.id, {
streamDate
})
}
}

View File

@ -0,0 +1,17 @@
import { __screenRecordFansly } from "./screenRecordFansly";
import { test, expect, describe } from 'vitest';
describe('screenRecordFansly', () => {
test("browser spawn", async () => {
const result = await __screenRecordFansly('https://fansly.com/miamimilana');
expect(result).toEqual({
complete: true,
results: {
"foo": "bar"
}
});
}, 120000);
})

View File

@ -0,0 +1,264 @@
import { type Job } from 'bullmq';
import { getPocketBaseClient } from '../util/pocketbase';
import spawn, { SubprocessError } from 'nano-spawn';
import { type Page, Locator } from 'puppeteer';
import puppeteer from 'puppeteer-extra';
import StealthPlugin from 'puppeteer-extra-plugin-stealth'
import env from '../../.config/env';
puppeteer.use(StealthPlugin())
async function sleep(delay: number) {
return new Promise<void>((resolve) => {
setTimeout(resolve, delay);
});
}
type VTuber = {
id: string,
fansly: string,
}
export async function screenRecordFansly(job: Job) {
job.log(`the job '${job.name}' is running`);
const vtuberSlug = job.data?.vtuberSlug;
if (!vtuberSlug) throw new Error('vtuberSlug was missing from job data.');
const pb = await getPocketBaseClient();
const list = await pb.collection('vtubers').getList(1, 50, {
filter: `slug = ${vtuberSlug}`
})
if (list.items.length == 0) throw new Error(`failed to find matching vtuber (slug=${vtuberSlug}) in the database.`);
const vtuber = list.items.at(0) as unknown as VTuber;
const fansly = vtuber?.fansly
if (!fansly) throw new Error(`vtuber ${vtuber?.id} was missing a fansly URL`);
await __screenRecordFansly(fansly);
}
export async function __screenRecordFansly(fansly: string) {
let loop = true;
// get the vtuber we are to record (pocketbase record)
// get the vtuber's fansly
// open Firefox to the fansly URL
// ensure we are logged in
// ensure the stream is started
// open OBS and record
// detect the end of stream
// stop recording
// check for chromium
// try {
// await spawn('brave-browser', ['--help']);
// } catch (e) {
// if (e instanceof SubprocessError) {
// if (e.exitCode !== 0) {
// throw new Error('chromium exited with rc ' + e.exitCode);
// }
// }
// }
// await spawn('firefox', ['--display', ':0.0', '-P', 'fp-worker', fansly]);
// Launch the browser and open a new blank page.
const browser = await puppeteer.launch({ headless: false, executablePath: '/usr/bin/brave-browser' });
const page = await browser.newPage();
// Navigate the page to a URL.
await page.goto(fansly);
// Set screen size.
await page.setViewport({ width: 1920, height: 1080 });
// determine the screen we are on
async function handlePage(page: Page) {
console.log('Looking for a known page...');
const warningPage = page.locator('div ::-p-text(Possible Age Restricted Content)');
const loginModal = page.locator('div ::-p-text(Forgot password?)');
const liveLoginRequiredPage = page.locator('::-p-text(to view the Stream)');
const modelPage = page.locator('::-p-text( Live )');
const notificationsModal = page.locator('::-p-text(Enable Push Notifications)');
const streamSensitiveContent = page.locator('::-p-text(This Stream may contain sensitive content)');
const following = page.locator('div ::-p-text(following)');
const payment = page.locator('div ::-p-text(Valid Payment Method Required)');
const video = page.locator('::-p-xpath(//video[contains(@src, "blob:")])')
const foundPage = await Promise.race([
warningPage.wait().then(() => 'warning'),
loginModal.wait().then(() => 'login'),
liveLoginRequiredPage.wait().then(() => 'liveLoginRequired'),
modelPage.wait().then(() => 'model'),
notificationsModal.wait().then(() => 'notifications'),
streamSensitiveContent.wait().then(() => 'sensitive'),
following.wait().then(() => 'following'),
payment.wait().then(() => 'payment'),
video.wait().then(() => 'video'),
])
console.log('foundPage', foundPage);
switch (foundPage) {
case 'video':
await handleVideo(page);
break;
case 'warning':
await handleWarningModal(page);
break;
case 'login':
await handleLoginPage(page);
break;
case 'liveLoginRequired':
await handleLiveLoginRequired(page);
break;
case 'model':
await handleModelPage(page);
break;
case 'fansly':
await handleLoginPage(page);
break;
case 'sensitive':
await handleStreamSensitiveContent(page);
break;
case 'notifications':
await handleNotifications(page);
break;
case 'following':
await handleFollowing(page);
break;
case 'payment':
throw new Error('payment method is required! Please use an account that already has payment enabled');
break;
default:
console.error("Unknown page");
throw new Error("No known page matched");
}
}
// 🟦 Page-specific handlers
async function handleVideo(page: Page) {
console.log('Handling video');
// * [ ] ensure we are at 50% volume
// * [ ] ensure we are playing
// * [ ] ensure we are theater mode
// * [ ] ensure we hide the "Brave is being controlled by automated test software"
// * [ ] ensure we hide brave notifications
await page.locator('')
loop = false;
}
async function handleFollowing(page: Page) {
console.log('Handling following');
await page.locator('div ::-p-text(following)').click();
}
async function handleStreamSensitiveContent(page: Page) {
console.log('Handling stream sensitive content');
await page.locator('div ::-p-text(here)').click();
}
async function handleNotifications(page: Page) {
console.log('Handling push notifications modal');
await page.locator('div ::-p-text(Maybe Later)').click();
}
async function handleModelPage(page: Page) {
console.log('Handling model page...');
await page.locator('::-p-text( Live )').click();
}
async function handleLoginPage(page: Page) {
console.log("Handling login page...");
await page.locator('#fansly_login').fill(env.FANSLY_USERNAME);
await page.locator('#fansly_password').fill('');
await page.locator('#fansly_password').fill(env.FANSLY_PASSWORD);
await page.locator('::-p-text(Sign in)').click();
}
async function handleLiveLoginRequired(page: Page) {
console.log("Handling handleLiveLoginRequired...");
await page.locator('body > app-root > div > div.site-wrapper.nav-bar-visible.nav-bar-mobile-visible.nav-bar-top-visible.is-live-route > div > app-live-route > div > div > div.flex-col.video-wrapper > div.flex-row.width-100.stream-placeholder > div.stream-placeholder-content > div > span:nth-child(1)').click();
}
async function handleTosModal(page: Page) {
console.log("Handling TOS modal...");
await page.locator('#accept').click();
}
async function handleEmailVerification(page: Page) {
console.log("Handling email verification page...");
await page.locator('#resend-link').click();
}
async function handleWarningModal(page: Page) {
console.log('clicking the Enter button...')
// const el = await page.locator('div ::-p-text(Enter)').waitHandle();
// el?.evaluate((element) => element.click())
const el = await page.locator('div ::-p-text("Enter")').waitHandle();
if (el) {
await el.click({ count: 2, delay: 1000, debugHighlight: true }); // this usually fails
page.$eval('div.btn:nth-child(2)', el => el.click()); // this usually works
}
}
while (loop) {
try {
await handlePage(page);
} catch (e) {
console.error('error while handling page.')
console.error(e)
}
await sleep(3000);
}
console.log('Now we sit back and enjoy teh show');
await obsRecord();
// @todo wait for stream to end
await sleep(60000);
// await browser.close();
const results = {
foo: 'bar'
}
return { complete: true, results };
}
export async function obsRecord() {
await spawn('obs', ['--startrecording', '--minimize-to-tray']);
}

View File

@ -1,5 +1,6 @@
import { Queue } from 'bullmq';
export const generalQueue = new Queue('generalQueue');
import { connection } from '../../.config/bullmq.config';
export const generalQueue = new Queue('generalQueue', { connection });
await generalQueue.upsertJobScheduler(
'every-day-presign-mux-job',
@ -14,14 +15,7 @@ await generalQueue.upsertJobScheduler(
);
// await generalQueue.upsertJobScheduler(
// 'copy-v2',
// {
// pattern: '* * * * *', // Runs at 07:03 every day
// },
// {
// name: 'copyV2ThumbToV3',
// data: {},
// opts: {}, // Optional additional job options
// },
// );
// 'find-work-every-one-minute',
// { every: 1000 * 60 },
// { name: 'findWork' }
// )

View File

@ -1,7 +1,7 @@
import { Queue } from "bullmq";
export const gpuQueue = new Queue('gpuQueue');
import { connection } from "../../.config/bullmq.config";
export const gpuQueue = new Queue('gpuQueue', { connection });
await gpuQueue.upsertJobScheduler(
'schedule-vod-processing-recurring',

View File

@ -1,5 +1,6 @@
import { Queue } from "bullmq";
export const highPriorityQueue = new Queue('highPriorityQueue');
import { connection } from "../../.config/bullmq.config";
export const highPriorityQueue = new Queue('highPriorityQueue', { connection });
await highPriorityQueue.upsertJobScheduler(
'sync-patreon-recurring-job',
@ -11,4 +12,16 @@ await highPriorityQueue.upsertJobScheduler(
data: {},
opts: {}
},
)
)
await highPriorityQueue.upsertJobScheduler(
'get-announce-url-details',
{
every: 1000 * 63
},
{
name: 'getAnnounceUrlDetails',
data: {},
opts: {},
},
);

View File

@ -1,4 +0,0 @@
import { Queue } from 'bullmq';
import { connection } from '../../.config/bullmq.config.ts';
export const parentMq = new Queue('parentMq', { connection });

View File

@ -0,0 +1,7 @@
export type Vod = {
id: string,
magnetLink: string,
announceUrl: string,
videoSrcB2: string,
sourceVideo: string,
}

View File

@ -0,0 +1,7 @@
qbittorrent library common problems and their solutions
### Error: Torrent creation failed: Create new torrent file failed. Reason: no files in torrent [libtorrent:16].
This can happen when you give createTorrent() a file path that doesn't exist.
The solution is to give createTorrent() a file path that exists in the qbittorrent-nox context. (If you're using a docker container, it could be a mounted volume issue)

View File

@ -0,0 +1,57 @@
import { QBittorrentClient } from "./qbittorrent";
import { test, expect, describe, beforeAll, expectTypeOf } from 'vitest';
import { join } from "node:path";
const fixturesDir = join(import.meta.dirname, '..', 'fixtures');
const torrentFixture = join(fixturesDir, 'ubuntu-24.04.3-desktop-amd64.iso.torrent');
const fileFixture = join(fixturesDir, 'pizza.avif');
describe('qbittorrent integration', () => {
let client: QBittorrentClient
beforeAll(async () => {
client = new QBittorrentClient();
});
test("QBittorrentClient methods", async () => {
expect(client).toHaveProperty('addTorrent');
expect(client).toHaveProperty('getInfoHashV2');
expect(client).toHaveProperty('connect');
expect(client).toHaveProperty('getMagnetLink');
expect(client).toHaveProperty('createTorrent');
expect(client).toHaveProperty('__getVersion');
expect(client).toHaveProperty('__fetchTorrentFile');
});
test("__getVersion", async () => {
await client.connect();
const { major, minor, patch, version } = await client.__getVersion();
expectTypeOf(major).toBeNumber();
expectTypeOf(minor).toBeNumber();
expectTypeOf(patch).toBeNumber();
expectTypeOf(version).toBeString();
});
test("addTorrent", async () => {
const torrent = await client.addTorrent(torrentFixture);
expect(torrent).toHaveProperty('infohash_v1');
expect(torrent).toHaveProperty('infohash_v2');
expect(torrent).toHaveProperty('added_on');
expect(torrent).toHaveProperty('magnet_uri');
});
test("connect", async () => {
await client.connect();
});
test("createTorrent", async () => {
const torrent = await client.createTorrent(fileFixture);
console.log('torrent', torrent)
expect(torrent).toHaveProperty('magnetLink');
expect(torrent).toHaveProperty('torrentFilePath');
expect(torrent).toHaveProperty('info');
});
})

View File

@ -0,0 +1,500 @@
/**
* src/utils/qbittorrent.ts
*
* qBittorrent API client
* Used for creating torrent v1/v2 hybrids.
*
*
* to keep things simple,
* we use the same mounted volume directory names inside the docker container
* as on the host.
*
* /srv/futureporn/worker/qbittorrent/downloads
*
*/
import path from "node:path";
import env from "../../.config/env";
import { readFile, writeFile } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join, basename } from "node:path";
import { nanoid } from 'nanoid';
import semverParse from 'semver/functions/parse';
import { type SemVer } from 'semver';
interface QBittorrentClientOptions {
host?: string;
port?: number;
username?: string;
password?: string;
}
export interface TorrentCreatorTaskStatus {
errorMessage: string;
optimizeAlignment: boolean;
paddedFileSizeLimit: number;
pieceSize: number;
private: boolean;
sourcePath: string;
status: "Running" | "Finished" | "Failed" | string; // API may expand
taskID: string;
timeAdded: string; // raw string from qBittorrent
timeFinished: string; // raw string
timeStarted: string; // raw string
trackers: string[];
urlSeeds: string[];
}
export type TorrentCreatorTaskStatusMap = Record<string, TorrentCreatorTaskStatus>;
export interface QBTorrentInfo {
added_on: number;
amount_left: number;
auto_tmm: boolean;
availability: number;
category: string;
comment: string;
completed: number;
completion_on: number;
content_path: string;
dl_limit: number;
dlspeed: number;
download_path: string;
downloaded: number;
downloaded_session: number;
eta: number;
f_l_piece_prio: boolean;
force_start: boolean;
has_metadata: boolean;
hash: string;
inactive_seeding_time_limit: number;
infohash_v1: string;
infohash_v2: string;
last_activity: number;
magnet_uri: string;
max_inactive_seeding_time: number;
max_ratio: number;
max_seeding_time: number;
name: string;
num_complete: number;
num_incomplete: number;
num_leechs: number;
num_seeds: number;
popularity: number;
priority: number;
private: boolean;
progress: number;
ratio: number;
ratio_limit: number;
reannounce: number;
root_path: string;
save_path: string;
seeding_time: number;
seeding_time_limit: number;
seen_complete: number;
seq_dl: boolean;
size: number;
state: string;
super_seeding: boolean;
tags: string;
time_active: number;
total_size: number;
tracker: string;
trackers_count: number;
up_limit: number;
uploaded: number;
uploaded_session: number;
upspeed: number;
}
/**
* QBittorrentClient
*
* @see https://qbittorrent-api.readthedocs.io/en/latest/apidoc/torrentcreator.html
*/
export class QBittorrentClient {
private readonly host: string;
private readonly port: number;
private readonly username: string;
private readonly password: string;
private readonly baseUrl: string;
private sidCookie: string | null = null;
constructor(options: QBittorrentClientOptions = {}) {
const defaults = {
host: "localhost",
port: 8083,
username: "admin",
password: "adminadmin",
};
const envOptions = {
host: env.QBT_HOST!,
port: Number(env.QBT_PORT!),
username: env.QBT_USERNAME!,
password: env.QBT_PASSWORD!,
};
const { host, port, username, password } = {
...defaults,
...envOptions,
...options,
};
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.baseUrl = `http://${this.host}:${this.port}`;
}
async connect(): Promise<void> {
console.log(`Connecting to qBittorrent at ${this.baseUrl}`);
await this.login();
}
/**
* Throw if QBittorrent version is less than 5
*/
async versionCheck(): Promise<void> {
await this.connect();
const { major, version } = (await this.__getVersion());
if (major < 5) throw new Error(`QBittorrent is outdated. Expected version >5. Got ${version}`);
}
/**
*
* Query QBittorrent to get it's version.
*/
async __getVersion(): Promise<SemVer> {
const url = `${this.baseUrl}/api/v2/app/version`;
const res = await fetch(url, { headers: { "Cookie": this.sidCookie! } });
const text = await res.text();
const v = semverParse(text);
if (!v?.version) throw new Error(`failed to parse version from body text=${text}`);
return v;
}
/**
* Logs into qBittorrent Web API.
*
* Example (cURL):
* curl -i \
* --header 'Referer: http://localhost:8080' \
* --data 'username=admin&password=adminadmin' \
* http://localhost:8080/api/v2/auth/login
*
* Then use the returned SID cookie for subsequent requests.
*/
private async login(): Promise<void> {
console.log(`login() begin. using username=${this.username}, password=${this.password} env=${env.NODE_ENV}`)
const response = await fetch(`${this.baseUrl}/api/v2/auth/login`, {
method: "POST",
headers: {
"Content-Type": "application/x-www-form-urlencoded",
Referer: this.baseUrl,
},
body: new URLSearchParams({
username: this.username,
password: this.password,
}),
});
const responseBody = await response.text();
if (!response.ok) {
const msg = `Login request failed (${response.status} ${response.statusText}). body=${responseBody}`;
console.error(msg);
throw new Error(msg);
}
console.log(`Login response: status=${response.status} ${response.statusText}`);
console.log(`Headers: ${JSON.stringify([...response.headers.entries()])}`);
// Extract SID cookie
const setCookie = response.headers.get("set-cookie");
if (!setCookie) {
const msg = `Login failed: No SID cookie was returned. status=${response.status} ${response.statusText}. body=${responseBody}`;
console.error(msg);
throw new Error(msg);
}
this.sidCookie = setCookie;
console.log("Successfully logged into qBittorrent.");
}
/**
* addTorrentCreationTask
*
* @param sourcePath - a file on disk that we want to turn into a torrent
* @returns torrentFilePath - a newly created torrent file
* @IMPORTANT we do not send a multipart form here.
* we only send a application/x-www-form-urlencoded
* form, and we do not upload file data. we only send a file path.
*/
private async addTorrentCreationTask(sourcePath: string): Promise<string> {
const torrentFilePath = join(tmpdir(), `${nanoid()}.torrent`);
const url = `${this.baseUrl}/api/v2/torrentcreator/addTask`;
console.log(`addTorrentCreationTask using sourcePath=${sourcePath}, torrentFilePath=${torrentFilePath}, url=${url}`);
console.log(`addTorrent using sourcePath=${sourcePath}`)
if (!this.sidCookie) {
throw new Error("Not connected: SID cookie missing");
}
const trackers = [
'udp://tracker.future.porn:6969/announce'
]
const params = new URLSearchParams({
sourcePath,
isPrivate: "false",
format: "hybrid",
torrentFilePath: torrentFilePath,
comment: "https://futureporn.net",
source: "https://futureporn.net",
// trackers: trackers.join('\n'), // this doesn't work. the two trackers appear on the same line in a single, invalid URL
urlSeeds: "",
startSeeding: "false", // it always fails to seed right away (it works later.)
});
params.append('trackers', trackers[0]);
// params.append('trackers', trackers[1]); // this is a problem. it overrides the first.
const res = await fetch(url, {
method: "POST",
headers: {
Cookie: this.sidCookie,
'Content-Type': 'application/x-www-form-urlencoded',
},
body: params as any,
});
if (!res.ok) {
const body = await res.text();
throw new Error(`addTorrentCreationTask failed: ${res.status} ${res.statusText} ${body}`);
}
console.log('addTorrent success.');
if (!res.ok) {
const body = await res.text()
throw new Error(`addTask failed. status=${res.status} statusText=${res.statusText} body=${body}`);
}
const text = await res.text();
if (text.includes('Fail')) {
throw new Error('the response shows a failure--' + text);
}
const data = JSON.parse(text);
console.log({ addTaskResponse: data });
return data.taskID;
}
private async pollTorrentStatus(taskId: string): Promise<TorrentCreatorTaskStatus> {
while (true) {
console.log(`Polling torrent creation taskID=${taskId}`);
const res = await fetch(
`${this.baseUrl}/api/v2/torrentcreator/status?taskId=${taskId}`,
{ headers: { Cookie: this.sidCookie! } }
);
if (!res.ok) {
throw new Error(`status failed: ${res.status} ${res.statusText}`);
}
console.log('the request to poll for torrent status was successful.')
const statusMap = (await res.json()) as TorrentCreatorTaskStatusMap;
console.log({ statusMap: statusMap })
const task = Object.values(statusMap).find((t) => t.taskID === taskId);
console.log({ task: task })
if (!task) {
throw new Error(`Task ${taskId} not found in status response`);
}
console.log(` Torrent creator task status=${task.status}`);
switch (task.status) {
case "Failed":
const msg = `Torrent creation failed: ${task.errorMessage}`;
console.error(msg);
console.log('here is the task that failed', task);
throw new Error(msg);
case "Finished":
return task;
default:
// still running.. wait 1s and retry
await new Promise((r) => setTimeout(r, 1000));
}
}
}
/**
* Fetch a .torrent file from qBittorrent.
* qBittorrent writes the .torrent file to a given local path
*
* @param taskId
* @param outputDir - the torrent will be written to this directory
* @returns
*/
private async __fetchTorrentFile(taskId: string, outputDir: string): Promise<string> {
const url = `${this.baseUrl}/api/v2/torrentcreator/torrentFile?taskID=${taskId}`;
const res = await fetch(url, {
headers: { Cookie: this.sidCookie! }, // or use Authorization header
});
if (!res.ok) {
const body = await res.text();
throw new Error(`torrentFile failed: ${res.status} ${res.statusText} ${body}`);
}
const arrayBuffer = await res.arrayBuffer();
const filePath = join(outputDir, `${taskId}.torrent`);
await writeFile(filePath, new Uint8Array(arrayBuffer));
return filePath;
}
private async __getTorrentInfos(torrentName: string): Promise<QBTorrentInfo> {
if (!torrentName) throw new Error('__getTorrentInfos requires torrentName as first arg. However, arg was falsy. ');
// ensure we're logged in
if (!this.sidCookie) throw new Error('We are not logged into QBittorrent. (sidCookie was missing.)');
// qBittorrent does NOT return infoHash directly here
// we have to get it by querying the torrents list
const torrentsRes = await fetch(`${this.baseUrl}/api/v2/torrents/info`, {
headers: { Cookie: this.sidCookie! },
});
const torrents = await torrentsRes.json() as Array<{ hash: string; name: string }>;
const torrent = torrents.find((t) => t.name === torrentName) as QBTorrentInfo;
if (!torrent) {
throw new Error(`Torrent ${torrentName} not found in qBittorrent after adding`);
}
return torrent;
}
async getInfoHashV2(torrentName: string): Promise<string> {
console.log(`getInfoHashV2 using torrentName=${torrentName}`)
const torrent = await this.__getTorrentInfos(torrentName);
return torrent.infohash_v2;
}
/**
* Add a local torrent file to qBittorrent.
*/
async addTorrent(localFilePath: string) {
if (!localFilePath) throw new Error('addTorrent requires a path to a local file, but it was undefined.');
const bn = basename(localFilePath).replace('.torrent', '');
await this.connect();
await this.__addTorrent(localFilePath);
return (await this.__getTorrentInfos(bn));
}
/**
* Add a local torrent file to qBittorrent.
*/
private async __addTorrent(localFilePath: string): Promise<void> {
console.log(`addTorrent using localFilePath=${localFilePath}`)
if (!this.sidCookie) {
throw new Error("Not connected. (SID cookie missing.)");
}
const form = new FormData();
const fileBuffer = await readFile(localFilePath);
const blob = new Blob([fileBuffer]); // wrap Buffer in Blob (necessary for MDN FormData)
form.append("torrents", blob, path.basename(localFilePath)); // this is for MDN FormData
// form.append("torrents", fileBuffer); // this is for npm:form-data
form.append("savepath", "/tmp"); // optional: specify download path
form.append("paused", "true"); // start downloading immediately
const res = await fetch(`${this.baseUrl}/api/v2/torrents/add`, {
method: "POST",
headers: {
Cookie: this.sidCookie,
},
body: form as any,
});
if (!res.ok) {
const body = await res.text();
throw new Error(`addTorrent failed: ${res.status} ${res.statusText} ${body}`);
}
console.log('addTorrent success.');
}
async getMagnetLink(fileName: string): Promise<string> {
console.log(`getMagnetLink using fileName=${fileName}`)
// qBittorrent does NOT return infoHash directly here
// we have to get it by querying the torrents list
const torrent = await this.__getTorrentInfos(fileName);
if (!torrent) {
throw new Error(`Torrent ${fileName} not found in qBittorrent after adding`);
}
return torrent.magnet_uri;
}
async createTorrent(localFilePath: string): Promise<{ torrentFilePath: string; magnetLink: string, info: QBTorrentInfo }> {
console.log(`Creating torrent from file: ${localFilePath}`);
await this.connect();
if (!this.sidCookie) {
throw new Error("sidCookie was missing. This is likely a bug.");
}
// 1. start task
const taskId = await this.addTorrentCreationTask(localFilePath);
console.log(`Created torrent task ${taskId}`);
// 2. poll until finished
await this.pollTorrentStatus(taskId);
// 3. fetch torrent file
const torrentFilePath = await this.__fetchTorrentFile(taskId, tmpdir());
// 4. add the torrent to qBittorrent
// We *could* add the torrent in the torrentcreator,
// but that usually errors right away,
// so we add it here instead.
await this.__addTorrent(torrentFilePath);
// 5. Get magnet link
const info = await this.__getTorrentInfos(basename(localFilePath))
const magnetLink = info.magnet_uri;
return {
magnetLink, torrentFilePath, info
};
}
}
export const qbtClient = new QBittorrentClient({ host: env.QBT_HOST });

View File

@ -0,0 +1,15 @@
import sleep from './sleep.ts';
export default async function retry(fn: any, retries = 6, delayMs = 500) {
let lastError;
for (let attempt = 1; attempt <= retries; attempt++) {
try {
return await fn();
} catch (err) {
lastError = err;
console.warn(`Attempt ${attempt} failed: ${err}. Retrying in ${delayMs}ms...`);
if (attempt < retries) await sleep(delayMs);
}
}
throw lastError;
}

View File

@ -0,0 +1,118 @@
// src/utils/sftp.ts
import { Client, ConnectConfig, SFTPWrapper } from 'ssh2';
import path from 'path';
import env from '../../.config/env';
interface SSHClientOptions {
host: string;
port?: number;
username: string;
password?: string;
privateKey?: Buffer;
}
export class SSHClient {
private client = new Client();
private sftp?: SFTPWrapper;
private connected = false;
constructor(private options: SSHClientOptions) { }
async connect(): Promise<void> {
if (this.connected) return;
await new Promise<void>((resolve, reject) => {
this.client
.on('ready', () => resolve())
.on('error', reject)
.connect({
host: this.options.host,
port: this.options.port || 22,
username: this.options.username,
password: this.options.password,
privateKey: this.options.privateKey,
} as ConnectConfig);
});
this.connected = true;
}
private async getSFTP(): Promise<SFTPWrapper> {
if (!this.sftp) {
this.sftp = await new Promise((resolve, reject) => {
this.client.sftp((err, sftp) => {
if (err) reject(err);
else resolve(sftp);
});
});
}
return this.sftp;
}
async exec(command: string): Promise<string> {
await this.connect();
return new Promise<string>((resolve, reject) => {
this.client.exec(command, (err, stream) => {
if (err) return reject(err);
let stdout = '';
let stderr = '';
stream
.on('close', (code: number) => {
if (code !== 0) reject(new Error(`Command failed: ${stderr}`));
else resolve(stdout.trim());
})
.on('data', (data: Buffer) => (stdout += data.toString()))
.stderr.on('data', (data: Buffer) => (stderr += data.toString()));
});
});
}
async uploadFile(localFilePath: string, remoteDir: string): Promise<void> {
console.log(`Uploading localFilePath=${localFilePath} to remoteDir=${remoteDir}...`);
console.log('awaiting connect')
await this.connect();
console.log('getting sftp')
const sftp = await this.getSFTP();
console.log('getting fileName')
const fileName = path.basename(localFilePath);
console.log(`fileName=${fileName}`)
const remoteFilePath = path.posix.join(remoteDir, fileName);
console.log(`remoteFilePath=${remoteFilePath}`)
await new Promise<void>((resolve, reject) => {
sftp.fastPut(localFilePath, remoteFilePath, (err) => (err ? reject(err) : resolve()));
});
}
async downloadFile(remoteFilePath: string, localPath: string): Promise<void> {
console.log(`downloading remoteFilePath=${remoteFilePath} to localPath=${localPath}`)
await this.connect();
const sftp = await this.getSFTP();
await new Promise<void>((resolve, reject) => {
sftp.fastGet(remoteFilePath, localPath, (err: any) => (err ? reject(err) : resolve()));
});
}
end(): void {
this.client.end();
this.connected = false;
}
}
// --- usage helper ---
const url = URL.parse(env.SEEDBOX_SFTP_URL);
const hostname = url?.hostname;
const port = url?.port;
export const sshClient = new SSHClient({
host: hostname!,
port: port ? parseInt(port) : 22,
username: env.SEEDBOX_SFTP_USERNAME,
password: env.SEEDBOX_SFTP_PASSWORD,
});

View File

@ -0,0 +1,3 @@
export default async function sleep(ms: number) {
return new Promise(resolve => setTimeout(resolve, ms));
}

View File

@ -3,21 +3,37 @@
import { Worker } from 'bullmq';
import { connection } from '../../.config/bullmq.config.ts';
import { presignMuxAssets } from '../processors/presignMuxAssets.ts';
import { copyV1VideoAll } from '../processors/copyV1VideoAll.ts';
import { copyV2ThumbToV3 } from '../processors/copyV2ThumbToV3.ts';
import { copyV1VideoToV3 } from '../processors/copyV1VideoToV3.ts';
import { createTorrent } from '../processors/createTorrent.ts';
import { analyzeAudio } from '../processors/analyzeAudio.ts';
import { findWork } from '../processors/findWork.ts';
new Worker(
'generalQueue',
async (job) => {
console.log('generalWorker. we got a job on the generalQueue.', job.data, job.name);
switch (job.name) {
case 'findWork':
return await findWork(job);
case 'copyV1VideoAll':
return await copyV1VideoAll(job);
case 'copyV1VideoToV3':
return await copyV1VideoToV3(job);
case 'presignMuxAssets':
return await presignMuxAssets(job);
case 'copyV2ThumbToV3':
return await copyV2ThumbToV3(job);
// case 'analyzeAudio':
// return await analyzeAudio(job);
case 'createTorrent':
return await createTorrent(job);
case 'analyzeAudio':
return await analyzeAudio(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
@ -27,3 +43,4 @@ new Worker(
);
console.log('generalWorker is running...');

View File

@ -3,6 +3,7 @@
import { Worker } from 'bullmq';
import { connection } from '../../.config/bullmq.config.ts';
import { syncronizePatreon } from '../processors/syncronizePatreon.ts'
import { getAnnounceUrlDetails } from '../processors/getAnnounceUrlDetails.ts'
new Worker(
'highPriorityQueue',
@ -11,7 +12,8 @@ new Worker(
switch (job.name) {
case 'syncronizePatreon':
return await syncronizePatreon(job);
case 'getAnnounceUrlDetails':
return await getAnnounceUrlDetails(job);
default:
throw new Error(`Unknown job name: ${job.name}`);
}

View File

@ -2,7 +2,15 @@
loginctl enable-linger
sudo cp worker.service /etc/systemd/user/worker.service
systemctl --user daemon-reload
systemctl --user restart worker
systemctl --user enable worker
systemctl --user status worker
systemctl --user status worker
systemctl status valkey
echo "Done. Press Enter key to get worker logs or press Ctrl+C to quit"
read r
journalctl --user -u worker -ef
systemctl --user status worker

View File

@ -6,7 +6,7 @@ After=network.target
Type=simple
Restart=always
RestartSec=5
ExecStart=/home/cj/Documents/futureporn-monorepo/services/worker/entrypoint.sh
ExecStart=/home/cj/.nvm/versions/node/v22.18.0/bin/node --import tsx ./src/index.ts
WorkingDirectory=/home/cj/Documents/futureporn-monorepo/services/worker
EnvironmentFile=/home/cj/Documents/futureporn-monorepo/services/worker/.env.production.local
Restart=on-failure