fp/packages/capture/index.js

155 lines
3.8 KiB
JavaScript
Executable File

#!/usr/bin/env node
// import Capture from './src/Capture.js'
// import Video from './src/Video.js'
import dotenv from 'dotenv'
dotenv.config()
import { createId } from '@paralleldrive/cuid2'
import os from 'os'
import fs from 'node:fs'
import { loggerFactory } from "./src/logger.js"
import { verifyStorage } from './src/disk.js'
import { record, assertDependencyDirectory, checkFFmpeg } from './src/record.js'
import fastq from 'fastq'
import pRetry from 'p-retry';
import Fastify from 'fastify';
// Create a map to store the work queues
const workQueues = new Map();
async function captureTask (args, cb) {
const { appContext, playlistUrl, roomName } = args;
try {
const downloadStream = async () => {
const rc = await record(appContext, playlistUrl, roomName)
if (rc !== 0) throw new Error('ffmpeg exited irregularly (return code was other than zero)')
}
await pRetry(downloadStream, {
retries: 3,
onFailedAttempt: error => {
appContext.logger.log({ level: 'error', message: `downloadStream attempt ${error.attemptNumber} failed. There are ${error.retriesLeft} retries left.` });
},
})
} catch (e) {
// we can get here if all retries are exhausted.
// this could be that the stream is over, the playlistUrl might be different, etc.
// we might have queued tasks so we don't want to crash.
appContext.logger.log({ level: 'error', message: `downloadStream exhausted all retries.` })
appContext.logger.log({ level: 'error', message: e })
}
verifyStorage(appContext)
appContext.logger.log({ level: 'info', message: 'Capture task complete'})
cb(null, null)
}
/**
*
* Fastify is used to facilitate Docker health checks
*
*/
async function initFastify(appContext) {
appContext.fastify = Fastify({
logger: true
})
// Declare a route
appContext.fastify.get('/health', function (_, reply) {
reply.send({ message: 'futureporn-capture sneed' });
})
// Run the server!
appContext.fastify.listen({ port: appContext.env.PORT }, function (err, address) {
if (err) {
appContext.fastify.log.error(err)
process.exit(1)
}
})
}
async function init () {
const appEnv = new Array(
'FUTUREPORN_WORKDIR',
'DOWNLOADER_UA',
'PORT'
)
const logger = loggerFactory({
service: 'futureporn/capture'
})
const appContext = {
env: appEnv.reduce((acc, ev) => {
if (typeof process.env[ev] === 'undefined') throw new Error(`${ev} is undefined in env`);
acc[ev] = process.env[ev];
return acc;
}, {}),
logger,
pkg: JSON.parse(fs.readFileSync('./package.json', { encoding: 'utf-8'})),
workerId: `${os.hostname}-${createId()}`,
};
await initFastify(appContext);
assertDependencyDirectory(appContext)
await checkFFmpeg(appContext)
verifyStorage(appContext)
return appContext
}
async function main () {
const appContext = await init()
appContext.logger.log({ level: 'info', message: `capture version: ${appContext.pkg.version}` })
appContext.logger.log({ level: 'info', message: `my capture directory is ${appContext.env.FUTUREPORN_WORKDIR}` })
// connect to realtime server
appContext.pubsub.subscribe('/signals', (message) => {
appContext.logger.log({ level: 'debug', message: JSON.stringify(message) })
if (
(message?.signal === 'start') &&
(message?.room) &&
(message?.url.startsWith('https://'))
) {
const roomName = message.room;
const playlistUrl = message.url;
// Check if a work queue for the room already exists, otherwise create a new one
if (!workQueues.has(roomName)) {
workQueues.set(roomName, fastq(captureTask, 1));
}
// Push the task to the corresponding work queue
workQueues.get(roomName).push({ appContext, playlistUrl, roomName });
}
})
}
main()