#!/usr/bin/env node // import Capture from './src/Capture.js' // import Video from './src/Video.js' import dotenv from 'dotenv' dotenv.config() import { createId } from '@paralleldrive/cuid2' import os from 'os' import fs from 'node:fs' import { loggerFactory } from "./src/logger.js" import { verifyStorage } from './src/disk.js' import { record, assertDependencyDirectory, checkFFmpeg } from './src/record.js' import fastq from 'fastq' import pRetry from 'p-retry'; import Fastify from 'fastify'; // Create a map to store the work queues const workQueues = new Map(); async function captureTask (args, cb) { const { appContext, playlistUrl, roomName } = args; try { const downloadStream = async () => { const rc = await record(appContext, playlistUrl, roomName) if (rc !== 0) throw new Error('ffmpeg exited irregularly (return code was other than zero)') } await pRetry(downloadStream, { retries: 3, onFailedAttempt: error => { appContext.logger.log({ level: 'error', message: `downloadStream attempt ${error.attemptNumber} failed. There are ${error.retriesLeft} retries left.` }); }, }) } catch (e) { // we can get here if all retries are exhausted. // this could be that the stream is over, the playlistUrl might be different, etc. // we might have queued tasks so we don't want to crash. appContext.logger.log({ level: 'error', message: `downloadStream exhausted all retries.` }) appContext.logger.log({ level: 'error', message: e }) } verifyStorage(appContext) appContext.logger.log({ level: 'info', message: 'Capture task complete'}) cb(null, null) } /** * * Fastify is used to facilitate Docker health checks * */ async function initFastify(appContext) { appContext.fastify = Fastify({ logger: true }) // Declare a route appContext.fastify.get('/health', function (_, reply) { reply.send({ message: 'futureporn-capture sneed' }); }) // Run the server! appContext.fastify.listen({ port: appContext.env.PORT }, function (err, address) { if (err) { appContext.fastify.log.error(err) process.exit(1) } }) } async function init () { const appEnv = new Array( 'FUTUREPORN_WORKDIR', 'DOWNLOADER_UA', 'PORT' ) const logger = loggerFactory({ service: 'futureporn/capture' }) const appContext = { env: appEnv.reduce((acc, ev) => { if (typeof process.env[ev] === 'undefined') throw new Error(`${ev} is undefined in env`); acc[ev] = process.env[ev]; return acc; }, {}), logger, pkg: JSON.parse(fs.readFileSync('./package.json', { encoding: 'utf-8'})), workerId: `${os.hostname}-${createId()}`, }; await initFastify(appContext); assertDependencyDirectory(appContext) await checkFFmpeg(appContext) verifyStorage(appContext) return appContext } async function main () { const appContext = await init() appContext.logger.log({ level: 'info', message: `capture version: ${appContext.pkg.version}` }) appContext.logger.log({ level: 'info', message: `my capture directory is ${appContext.env.FUTUREPORN_WORKDIR}` }) // connect to realtime server appContext.pubsub.subscribe('/signals', (message) => { appContext.logger.log({ level: 'debug', message: JSON.stringify(message) }) if ( (message?.signal === 'start') && (message?.room) && (message?.url.startsWith('https://')) ) { const roomName = message.room; const playlistUrl = message.url; // Check if a work queue for the room already exists, otherwise create a new one if (!workQueues.has(roomName)) { workQueues.set(roomName, fastq(captureTask, 1)); } // Push the task to the corresponding work queue workQueues.get(roomName).push({ appContext, playlistUrl, roomName }); } }) } main()