diff --git a/services/worker/src/processors/cacheGet.ts b/services/worker/src/processors/cacheGet.ts index 0021fe3a..1c55c51c 100644 --- a/services/worker/src/processors/cacheGet.ts +++ b/services/worker/src/processors/cacheGet.ts @@ -53,7 +53,7 @@ export async function getB2FileInfo(job: Job, s3Key: string): Promise const args = ["file", "info", `b2://${env.AWS_BUCKET}/${s3Key}`]; let stdout: string; - + await job.log(`Running ${cmd}, ${args.join(' ')}`); try { const result = await spawn(cmd, args); diff --git a/services/worker/src/processors/createTorrent.ts b/services/worker/src/processors/createTorrent.ts index a40249c1..56298580 100644 --- a/services/worker/src/processors/createTorrent.ts +++ b/services/worker/src/processors/createTorrent.ts @@ -150,7 +150,9 @@ async function createQBittorrentTorrent( async function uploadTorrentToSeedbox(job: Job, videoFilePath: string, torrentFilePath: string) { job.log(`Uploading ${videoFilePath} to seedbox...`); - await sshClient.uploadFile(videoFilePath, './data'); + await sshClient.uploadFile(videoFilePath, './data', async ({ percent }) => { + await job.log(`Video upload progress: ${percent.toFixed(1)}%`); + }); job.log(`Uploading ${torrentFilePath} to seedbox...`); await sshClient.uploadFile(torrentFilePath, './watch'); @@ -179,7 +181,7 @@ export async function createTorrent(job: Job) { } - job.log('Creating torrent.'); + job.log('Creating torrent. ~'); // we gotta put the download in a place that qbittorrent docker container can access it diff --git a/services/worker/src/processors/download.ts b/services/worker/src/processors/download.ts index a3e9dedb..c4b73779 100644 --- a/services/worker/src/processors/download.ts +++ b/services/worker/src/processors/download.ts @@ -73,21 +73,28 @@ export async function __download(job: Job, s3Key: string, cachePath: string) { if (!job) throw new Error('Job arg0 missing'); if (!s3Key) throw new Error('s3Key arg1 missing'); if (!cachePath) throw new Error('cachePath arg2 missing'); + job.log(`downloading ${s3Key} to ${cachePath}...`); - - const { size } = (await getB2FileInfo(job, s3Key)); + const { size } = await getB2FileInfo(job, s3Key); const stopMonitor = await monitorProgress(cachePath, size, job); - const { stdout } = await spawn('b2', ['file', 'download', `b2://${env.AWS_BUCKET}/${s3Key}`, cachePath]) - job.log(stdout); - stopMonitor(); + try { + const { stdout } = await spawn( + 'b2', + ['file', 'download', `b2://${env.AWS_BUCKET}/${s3Key}`, cachePath] + ); + job.log(stdout); + } finally { + stopMonitor(); // always stop monitor, even on error + } job.log('Download complete.'); } + export default async function download(job: Job) { assertPayload(job.data); const vodId = job.data.vodId; diff --git a/services/worker/src/util/qbittorrent.spec.ts b/services/worker/src/util/qbittorrent.spec.ts index b03f335d..0b026cc9 100644 --- a/services/worker/src/util/qbittorrent.spec.ts +++ b/services/worker/src/util/qbittorrent.spec.ts @@ -1,10 +1,11 @@ import { QBittorrentClient } from "./qbittorrent"; import { test, expect, describe, beforeAll, expectTypeOf } from 'vitest'; -import { join } from "node:path"; +import { join, basename } from "node:path"; const fixturesDir = join(import.meta.dirname, '..', 'fixtures'); -const torrentFixture = join(fixturesDir, 'ubuntu-24.04.3-desktop-amd64.iso.torrent'); +const ubuntuTorrentName = 'ubuntu-24.04.3-desktop-amd64.iso.torrent'; +const torrentFixture = join(fixturesDir, ubuntuTorrentName); const fileFixture = join(fixturesDir, 'pizza.avif'); describe('qbittorrent integration', () => { @@ -14,8 +15,6 @@ describe('qbittorrent integration', () => { }); test("QBittorrentClient methods", async () => { - - expect(client).toHaveProperty('addTorrent'); expect(client).toHaveProperty('getInfoHashV2'); expect(client).toHaveProperty('connect'); @@ -54,4 +53,10 @@ describe('qbittorrent integration', () => { expect(torrent).toHaveProperty('info'); }); + test("deleteTorrent", async () => { + await expect( + client.deleteTorrent(basename(ubuntuTorrentName, '.torrent')) + ).resolves.toBeUndefined(); + }); + }) diff --git a/services/worker/src/util/qbittorrent.ts b/services/worker/src/util/qbittorrent.ts index 661a8733..057640c5 100644 --- a/services/worker/src/util/qbittorrent.ts +++ b/services/worker/src/util/qbittorrent.ts @@ -153,9 +153,16 @@ export class QBittorrentClient { this.baseUrl = `http://${this.host}:${this.port}`; } + /** + * idempotently login to qBittorrent. + * + * + */ async connect(): Promise { - console.log(`Connecting to qBittorrent at ${this.baseUrl}`); - await this.login(); + if (!this.sidCookie) { + console.log(`Connecting to qBittorrent at ${this.baseUrl}`); + await this.__login(); + } } /** @@ -191,8 +198,8 @@ export class QBittorrentClient { * * Then use the returned SID cookie for subsequent requests. */ - private async login(): Promise { - console.log(`login() begin. using username=${this.username}, password=${this.password} env=${env.NODE_ENV}`) + private async __login(): Promise { + console.log(`login() begin. using username=${this.username}, password=${this.password} env=${env.NODE_ENV}`); const response = await fetch(`${this.baseUrl}/api/v2/auth/login`, { method: "POST", headers: { @@ -225,6 +232,8 @@ export class QBittorrentClient { } this.sidCookie = setCookie; + console.log(`sidCookie=${this.sidCookie}`); + console.log("Successfully logged into qBittorrent."); } @@ -395,7 +404,7 @@ export class QBittorrentClient { const torrent = torrents.find((t) => t.name === torrentName) as QBTorrentInfo; if (!torrent) { - throw new Error(`__getTorrentInfos failure. Torrent ${torrentName} not found in qBittorrent after adding`); + throw new Error(`__getTorrentInfos failure. Torrent ${torrentName} not found in qBittorrent`); } return torrent; } @@ -439,7 +448,7 @@ export class QBittorrentClient { const fileBuffer = await readFile(localFilePath); - const blob = new Blob([fileBuffer]); // wrap Buffer in Blob (necessary for MDN FormData) + const blob = new Blob([fileBuffer as any]); // wrap Buffer in Blob (necessary for MDN FormData) form.append("torrents", blob, path.basename(localFilePath)); // this is for MDN FormData // form.append("torrents", fileBuffer); // this is for npm:form-data @@ -463,6 +472,69 @@ export class QBittorrentClient { console.log('__addTorrent success.'); } + /* + * @see https://github.com/qbittorrent/qBittorrent/wiki/WebUI-API-(qBittorrent-4.1)#delete-torrents + * @gripe (eww wtf qBittorrent, this isn't RESTful. If we're deleting a resource, we should be using DELETE. Oh well.) + * + * @param id {String} - a hash or a torrent name + */ + async deleteTorrent(id: string): Promise { + await this.connect(); + + if (!this.sidCookie) { + throw new Error('Not logged in. sidCookie missing.'); + } + + // Detect if `id` is already a 40-character hex hash + const isHexHash = /^[a-f0-9]{40}$/.test(id); + + let hashToDelete: string; + + if (isHexHash) { + // Already a hash → safe to delete + hashToDelete = id; + } else { + // Not a hash → treat as name → look up hash + const info = await this.__getTorrentInfos(id); + console.log('info', info); + hashToDelete = info.hash; + } + + console.log(`deleting ${id} (${hashToDelete})`); + await this.__deleteTorrent(hashToDelete); + + console.log(`deleteTorrent success for: ${hashToDelete}`); + } + + /** + * + * @param hashes {String} - Example: 8c212779b4abde7c6bc608063a0d008b7e40ce32|54eddd830a5b58480a6143d616a97e3a6c23c439 + */ + private async __deleteTorrent(hashes: string): Promise { + if (!hashes) throw new Error('__deleteTorrent hashes arg missing'); + if (!this.sidCookie) throw new Error('__deleteTorrent missing sidCookie'); + + console.log(`deleting hashes`, hashes) + + const body = new URLSearchParams({ + hashes, + deleteFiles: "false", + }); + + const res = await fetch(`${this.baseUrl}/api/v2/torrents/delete?hashes=${hashes}&deleteFiles=false`, { + method: "POST", + headers: { + Cookie: this.sidCookie, + }, + body + }); + + if (!res.ok) { + const body = await res.text(); + throw new Error(`__deleteTorrent failed: ${res.status} ${res.statusText} ${body}`); + } + } + /** * * @deprecated use __getTorrentInfos instead diff --git a/services/worker/src/util/sftp.ts b/services/worker/src/util/sftp.ts index 2a58a9bd..bc5cba0e 100644 --- a/services/worker/src/util/sftp.ts +++ b/services/worker/src/util/sftp.ts @@ -86,7 +86,11 @@ export class SSHClient { * @returns {Promise} * @throws {Error} If the upload fails. */ - async uploadFile(localFilePath: string, remoteDir: string): Promise { + async uploadFile( + localFilePath: string, + remoteDir: string, + onProgress?: (info: { transferred: number; total: number; percent: number }) => void + ): Promise { console.log(`Uploading localFilePath=${localFilePath} to remoteDir=${remoteDir}...`); console.log('awaiting connect') @@ -104,7 +108,21 @@ export class SSHClient { console.log(`remoteFilePath=${remoteFilePath}`) await new Promise((resolve, reject) => { - sftp.fastPut(localFilePath, remoteFilePath, (err) => (err ? reject(err) : resolve())); + sftp.fastPut( + localFilePath, + remoteFilePath, + { + step: (transferred, chunk, total) => { + const percent = (transferred / total) * 100; + + // Call user-supplied callback if provided + if (onProgress) { + onProgress({ transferred, total, percent }); + } + } + }, + (err) => (err ? reject(err) : resolve()) + ); }); }