import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts' import { Helpers, type Task } from 'graphile-worker' import Record from '../Record.ts' import type { SegmentResponse } from '@futureporn/types' import { configs } from '../config.ts' import { createId } from '@paralleldrive/cuid2' import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts' import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts' import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts' import getVod from '@futureporn/fetchers/getVod.ts' import RecordNextGeneration from '../RecordNextGeneration.ts' /** * url is the URL to be recorded. Ex: chaturbate.com/projektmelody * recordId is the ID of the record record in postgres * we use the ID to poll the db to see if the job is aborted by the user */ interface Payload { url: string; vod_id: string; } function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload.url !== "string") throw new Error("invalid url"); if (typeof payload.vod_id !== "string") throw new Error(`invalid vod_id=${payload.vod_id}`); } // async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) { // helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`) // const abortController = new AbortController() // const abortSignal = abortController.signal // const accessKeyId = configs.s3AccessKeyId; // const secretAccessKey = configs.s3SecretAccessKey; // const region = configs.s3Region; // const endpoint = configs.s3Endpoint; // const bucket = configs.s3UscBucket; // const playlistUrl = await getPlaylistUrl(url) // if (!playlistUrl) throw new Error('failed to getPlaylistUrl'); // helpers.logger.info(`playlistUrl=${playlistUrl}`) // const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) // const inputStream = Record.getFFmpegStream({ url: playlistUrl }) // const onProgress = (fileSize: number) => { // updateSegmentInDatabase({ segment_id, fileSize, helpers }) // .then(checkIfAborted) // .then((isAborted) => { // isAborted ? abortController.abort() : null // }) // .catch((e) => { // helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance') // helpers.logger.error(e) // }) // } // const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal }) // return record // } // function checkIfAborted(segment: Partial): boolean { // return (!!segment?.vod?.is_recording_aborted) // } /** * # doRecordSegment * * Record a segment of a livestream using ffmpeg. * * Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can. * * This function creates a new segments and vods_segments_links entry in the db via Postgrest REST API. * * This function also names the S3 file (s3_key) with a datestamp and a cuid. */ // const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) { // const s3_key = `${new Date().toISOString()}-${createId()}.ts` // helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`) // const segment_id = await createSegmentInDatabase(s3_key, vod_id) // helpers.logger.info(`let's create a segmentsStreamLink...`) // const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id) // helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`) // // no try-catch block here, because we need any errors to bubble up. // const record = await getRecordInstance(url, segment_id) // helpers.logger.info(`we got a Record instance. now we record.start()`) // // console.log(record) // return record.start() // } export const record: Task = async function (payload: unknown, helpers: Helpers) { assertPayload(payload) const { url, vod_id } = payload const vodId = vod_id const playlistUrl = await getPlaylistUrl(url) if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`) /** * RecordNextGeneration handles errors for us and re-throws ones that should fail the Task. * We intentionally do not use a try/catch block here. */ const recordNG = new RecordNextGeneration({ playlistUrl, vodId }) await recordNG.done() return; // try { // // if the VOD has been aborted, end Task with success // if ((await getVod(vod_id, helpers))?.is_recording_aborted) return; // /** // * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay. // * This will take effect only when Record() throws an error. // * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry. // * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops. // * // * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case. // */ // // await backOff(() => doRecordSegment(url, recordId, helpers)) // await doRecordSegment(url, vodId, helpers) // } catch (e) { // // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' }) // helpers.logger.error(`caught an error during record Task`) // if (e instanceof Error) { // helpers.logger.info(`error.name=${e.name}`) // if (e.name === 'RoomOfflineError') { // // If room is offline, we want to retry until graphile-worker retries expire. // // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error // // graphile-worker will retry when we re-throw the error below. // helpers.logger.info(`Room is offline.`) // } else if (e.name === 'AbortError') { // // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job. // // We swallow the error and return in order to mark the job as succeeded. // helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`) // return // } else { // helpers.logger.error(e.message) // } // } else { // helpers.logger.error(JSON.stringify(e)) // } // // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job. // helpers.logger.error(`we got an error during record Task so we throw and retry`) // throw e // } // helpers.logger.info('record Task has finished') } export default record