156 lines
3.8 KiB
JavaScript
Executable File
156 lines
3.8 KiB
JavaScript
Executable File
#!/usr/bin/env node
|
|
|
|
// import Capture from './src/Capture.js'
|
|
// import Video from './src/Video.js'
|
|
|
|
import dotenv from 'dotenv'
|
|
dotenv.config()
|
|
import { createId } from '@paralleldrive/cuid2'
|
|
import os from 'os'
|
|
import fs from 'node:fs'
|
|
import { loggerFactory } from "./src/logger.js"
|
|
import { verifyStorage } from './src/disk.js'
|
|
import faye from 'faye'
|
|
import { record, assertDependencyDirectory, checkFFmpeg } from './src/record.js'
|
|
import fastq from 'fastq'
|
|
import pRetry from 'p-retry';
|
|
import Fastify from 'fastify';
|
|
|
|
|
|
|
|
// Create a map to store the work queues
|
|
const workQueues = new Map();
|
|
|
|
|
|
|
|
async function captureTask (args, cb) {
|
|
const { appContext, playlistUrl, roomName } = args;
|
|
|
|
try {
|
|
const downloadStream = async () => {
|
|
const rc = await record(appContext, playlistUrl, roomName)
|
|
if (rc !== 0) throw new Error('ffmpeg exited irregularly (return code was other than zero)')
|
|
}
|
|
await pRetry(downloadStream, {
|
|
retries: 3,
|
|
onFailedAttempt: error => {
|
|
appContext.logger.log({ level: 'error', message: `downloadStream attempt ${error.attemptNumber} failed. There are ${error.retriesLeft} retries left.` });
|
|
},
|
|
})
|
|
} catch (e) {
|
|
// we can get here if all retries are exhausted.
|
|
// this could be that the stream is over, the playlistUrl might be different, etc.
|
|
// we might have queued tasks so we don't want to crash.
|
|
appContext.logger.log({ level: 'error', message: `downloadStream exhausted all retries.` })
|
|
appContext.logger.log({ level: 'error', message: e })
|
|
}
|
|
|
|
verifyStorage(appContext)
|
|
|
|
appContext.logger.log({ level: 'info', message: 'Capture task complete'})
|
|
cb(null, null)
|
|
}
|
|
|
|
/**
|
|
*
|
|
* Fastify is used to facilitate Docker health checks
|
|
*
|
|
*/
|
|
async function initFastify(appContext) {
|
|
appContext.fastify = Fastify({
|
|
logger: true
|
|
})
|
|
|
|
// Declare a route
|
|
appContext.fastify.get('/health', function (_, reply) {
|
|
reply.send({ message: 'futureporn-capture sneed' });
|
|
})
|
|
|
|
// Run the server!
|
|
appContext.fastify.listen({ port: appContext.env.PORT }, function (err, address) {
|
|
if (err) {
|
|
appContext.fastify.log.error(err)
|
|
process.exit(1)
|
|
}
|
|
})
|
|
}
|
|
|
|
async function init () {
|
|
|
|
const appEnv = new Array(
|
|
'FUTUREPORN_WORKDIR',
|
|
'DOWNLOADER_UA',
|
|
'PORT'
|
|
)
|
|
|
|
const logger = loggerFactory({
|
|
service: 'futureporn/capture'
|
|
})
|
|
|
|
|
|
const appContext = {
|
|
env: appEnv.reduce((acc, ev) => {
|
|
if (typeof process.env[ev] === 'undefined') throw new Error(`${ev} is undefined in env`);
|
|
acc[ev] = process.env[ev];
|
|
return acc;
|
|
}, {}),
|
|
logger,
|
|
pkg: JSON.parse(fs.readFileSync('./package.json', { encoding: 'utf-8'})),
|
|
workerId: `${os.hostname}-${createId()}`,
|
|
};
|
|
|
|
await initFastify(appContext);
|
|
|
|
assertDependencyDirectory(appContext)
|
|
await checkFFmpeg(appContext)
|
|
verifyStorage(appContext)
|
|
|
|
return appContext
|
|
}
|
|
|
|
|
|
async function main () {
|
|
|
|
const appContext = await init()
|
|
|
|
|
|
|
|
appContext.logger.log({ level: 'info', message: `capture version: ${appContext.pkg.version}` })
|
|
appContext.logger.log({ level: 'info', message: `my capture directory is ${appContext.env.FUTUREPORN_WORKDIR}` })
|
|
|
|
|
|
// connect to realtime server
|
|
appContext.pubsub.subscribe('/signals', (message) => {
|
|
appContext.logger.log({ level: 'debug', message: JSON.stringify(message) })
|
|
|
|
if (
|
|
(message?.signal === 'start') &&
|
|
(message?.room) &&
|
|
(message?.url.startsWith('https://'))
|
|
) {
|
|
|
|
const roomName = message.room;
|
|
const playlistUrl = message.url;
|
|
|
|
// Check if a work queue for the room already exists, otherwise create a new one
|
|
if (!workQueues.has(roomName)) {
|
|
workQueues.set(roomName, fastq(captureTask, 1));
|
|
}
|
|
|
|
|
|
// Push the task to the corresponding work queue
|
|
workQueues.get(roomName).push({ appContext, playlistUrl, roomName });
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
}
|
|
|
|
main()
|
|
|
|
|
|
|
|
|