fp/packages/capture/src/Voddo.js

243 lines
7.7 KiB
JavaScript

import 'dotenv/config'
import YoutubeDlWrap from "youtube-dl-wrap";
import { EventEmitter } from 'node:events';
import { AbortController } from "node-abort-controller";
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path'
import ffmpeg from 'fluent-ffmpeg'
import { loggerFactory } from 'common/logger'
const logger = loggerFactory({
service: 'futureporn/capture'
})
const defaultStats = {segments:[],lastUpdatedAt:null}
export default class Voddo extends EventEmitter {
constructor(opts) {
super()
this.courtesyTimer = setTimeout(() => {}, 0);
this.retryCount = 0;
this.url = opts.url;
this.format = opts.format || 'best';
this.cwd = opts.cwd;
this.ytdlee; // event emitter for ytdlwrap
this.stats = Object.assign({}, defaultStats);
this.abortController = new AbortController();
this.ytdl = opts.ytdl || new YoutubeDlWrap();
if (process.env.YOUTUBE_DL_BINARY) this.ytdl.setBinaryPath(process.env.YOUTUBE_DL_BINARY);
}
static async getVideoLength (filePath) {
return new Promise((resolve, reject) => {
ffmpeg.ffprobe(filePath, function(err, metadata) {
if (err) reject(err)
resolve(Math.floor(metadata.format.duration*1000))
});
})
}
// greets ChatGPT
static groupStreamSegments(segments, threshold = 1000*60*60) {
segments.sort((a, b) => a.startTime - b.startTime);
const streams = [];
let currentStream = [];
for (let i = 0; i < segments.length; i++) {
const currentSegment = segments[i];
const previousSegment = currentStream[currentStream.length - 1];
if (!previousSegment || currentSegment.startTime - previousSegment.endTime <= threshold) {
currentStream.push(currentSegment);
} else {
streams.push(currentStream);
currentStream = [currentSegment];
}
}
streams.push(currentStream);
return streams;
}
/**
* getRecordedStreams
*
* get the metadata of the videos captured
*/
async getRecordedSegments() {
let f = []
const files = await readdir(this.cwd).then((raw) => raw.filter((f) => /\.mp4$/.test(f) ))
for (const file of files) {
const filePath = join(this.cwd, file)
const s = await stat(filePath)
const videoDuration = await Voddo.getVideoLength(filePath)
const startTime = parseInt(s.ctimeMs)
const endTime = startTime+videoDuration
const size = s.size
f.push({
startTime,
endTime,
file,
size
})
}
this.stats.segments = f
return this.stats.segments
}
isDownloading() {
// if there are event emitter listeners for the progress event,
// we are probably downloading.
return (
this.ytdlee?.listeners('progress').length !== undefined
)
}
delayedStart() {
// only for testing
this.retryCount = 500
this.courtesyTimer = this.getCourtesyTimer(() => this.download())
}
start() {
// if download is in progress, do nothing
if (this.isDownloading()) {
logger.log({ level: 'debug', message: 'Doing nothing because a download is in progress.' })
return;
}
// if download is not in progress, start download immediately
// reset the retryCount so the backoff timer resets to 1s between attempts
this.retryCount = 0
clearTimeout(this.courtesyTimer)
// create new abort controller
//this.abortController = new AbortController() // @todo do i need this? Can't I reuse the existing this.abortController?
this.download()
}
stop() {
logger.log({ level: 'info', message: 'Received stop(). Stopping.' })
clearTimeout(this.courtesyTimer)
this.abortController.abort()
}
/** generate a report **/
getReport(errorMessage) {
let report = {}
report.stats = Object.assign({}, this.stats)
report.error = errorMessage
report.reason = (() => {
if (errorMessage) return 'error';
else if (this.abortController.signal.aborted) return 'aborted';
else return 'close';
})()
// clear stats to prepare for next run
this.stats = Object.assign({}, defaultStats)
return report
}
emitReport(report) {
logger.log({ level: 'debug', message: 'EMITTING REPORT' })
this.emit('stop', report)
}
getCourtesyTimer(callback) {
// 600000ms = 10m
const waitTime = Math.min(600000, (Math.pow(2, this.retryCount) * 1000));
this.retryCount += 1;
logger.log({ level: 'debug', message: `courtesyWait for ${waitTime/1000} seconds. (retryCount: ${this.retryCount})` })
return setTimeout(callback, waitTime)
}
download() {
const handleProgress = (progress) => {
logger.log({ level: 'debug', message:` [*] progress event` })
this.stats.lastUpdatedAt = Date.now(),
this.stats.totalSize = progress.totalSize
}
const handleError = (error) => {
if (error?.message !== undefined && error.message.includes('Room is currently offline')) {
logger.log({ level: 'debug', message: 'Handled an expected \'Room is offline\' error' })
} else {
logger.log({ level: 'error', message: 'ytdl error' })
logger.log({ level: 'error', message: error.message })
}
this.ytdlee.off('progress', handleProgress)
this.ytdlee.off('handleYtdlEvent', handleYtdlEvent)
// restart the download after the courtesyTimeout
this.courtesyTimer = this.getCourtesyTimer(() => this.download())
this.emitReport(this.getReport(error.message))
}
const handleYtdlEvent = (type, data) => {
logger.log({ level: 'debug', message: `handleYtdlEvent type: ${type}, data: ${data}` })
logger.log({ level: 'debug', message: `handleYtdlEvent type: ${type}, data: ${data}` })
if (type === 'download' && data.includes('Destination:')) {
let filePath = /Destination:\s(.*)$/.exec(data)[1]
logger.log({ level: 'debug', message: `Destination file detected: ${filePath}` })
let datum = { file: filePath, timestamp: new Date().valueOf() }
let segments = this.stats.segments
segments.push(datum) && segments.length > 64 && segments.shift(); // limit the size of the segments array
this.emit('start', datum)
} else if (type === 'ffmpeg' && data.includes('bytes')) {
const bytes = /(\d*)\sbytes/.exec(data)[1]
logger.log({ level: 'debug', message: `ffmpeg reports ${bytes}`})
let mostRecentFile = this.stats.segments[this.stats.segments.length-1]
mostRecentFile['size'] = bytes
logger.log({ level: 'debug', message: mostRecentFile })
}
}
const handleClose = () => {
logger.log({ level: 'debug', message: 'got a close event. handling!' });
this.ytdlee.off('progress', handleProgress)
this.ytdlee.off('handleYtdlEvent', handleYtdlEvent)
// restart Voddo only if the close was not due to stop()
if (!this.abortController.signal.aborted) {
// restart the download after the courtesyTimeout
this.courtesyTimer = this.getCourtesyTimer(() => this.download())
}
this.emitReport(this.getReport())
}
logger.log({ level: 'debug', message: `Downloading url:${this.url} format:${this.format}` })
logger.log({ level: 'debug', message: JSON.stringify(this.ytdl) })
// sanity check. ensure cwd exists
stat(this.cwd, (err) => {
if (err) logger.log({ level: 'error', message: `Error while getting cwd stats of ${this.cwd} Does it exist?` })
})
this.ytdlee = this.ytdl.exec(
[this.url, '-f', this.format],
{
cwd: this.cwd
},
this.abortController.signal
);
this.ytdlee.on('progress', handleProgress);
this.ytdlee.on('youtubeDlEvent', handleYtdlEvent);
this.ytdlee.once('error', handleError);
this.ytdlee.once('close', handleClose);
}
}