import { Helpers, type Task } from 'graphile-worker' import Record from '../Record.ts' import { getPlaylistUrl } from '@futureporn/scout/ytdlp.ts' import type { RecordingState } from '@futureporn/types' /** * url is the URL to be recorded. Ex: chaturbate.com/projektmelody * recordId is the ID of the record record in postgres * we use the ID to poll the db to see if the job is aborted by the user */ interface Payload { url: string, recordId: number } interface RecordingRecord { id: number; isAborted: boolean; } function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload.url !== "string") throw new Error("invalid url"); if (typeof payload.recordId !== "number") throw new Error(`invalid recordId=${payload.recordId}`); } function assertEnv() { if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was missing in env'); if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was missing in env'); if (!process.env.S3_REGION) throw new Error('S3_REGION was missing in env'); if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was missing in env'); if (!process.env.S3_BUCKET) throw new Error('S3_BUCKET was missing in env'); if (!process.env.POSTGREST_URL) throw new Error('POSTGREST_URL was missing in env'); } async function getRecording(url: string, recordId: number, abortSignal: AbortSignal) { const accessKeyId = process.env.S3_ACCESS_KEY_ID!; const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!; const region = process.env.S3_REGION!; const endpoint = process.env.S3_ENDPOINT!; const bucket = process.env.S3_BUCKET!; const playlistUrl = await getPlaylistUrl(url) const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) const inputStream = Record.getFFmpegStream({ url: playlistUrl }) const record = new Record({ inputStream, bucket, s3Client, jobId: ''+recordId }) // @todo add abortsignal return record } async function checkIfAborted(recordId: number): Promise { const res = await fetch(`${process.env.POSTGREST_URL}/records?id=eq.${recordId}`, { headers: { 'Content-Type': 'application/json', 'Accepts': 'application/json' } }) if (!res.ok) { throw new Error(`failed to checkIfAborted. status=${res.status}, statusText=${res.statusText}`); } const body = await res.json() as RecordingRecord[]; if (!body[0]) throw new Error(`failed to get a record that matched recordId=${recordId}`) return body[0].isAborted } async function updateDatabaseRecord({recordId, state, filesize}: { recordId: number, state: RecordingState, filesize: number }): Promise { const res = await fetch(`${process.env.POSTGREST_URL}/records?id=eq.${recordId}`, { method: 'PATCH', headers: { 'Content-Type': 'application/json', 'Accepts': 'application/json', 'Prefer': 'return=representation' }, body: JSON.stringify({ state, filesize }) }) if (!res.ok) { throw new Error(`failed to checkIfAborted. status=${res.status}, statusText=${res.statusText}`); } const body = await res.json() as RecordingRecord[]; if (!body[0]) throw new Error(`failed to get a record that matched recordId=${recordId}`) return body[0].isAborted } export const record: Task = async function (payload, helpers) { console.log(payload) assertPayload(payload) assertEnv() const { url, recordId } = payload const abortController = new AbortController() let interval try { const record = await getRecording(url, recordId, abortController.signal) // every 30s, we // 1. poll db to see if our job has been aborted by the user // 2. update the db record with the RecordingState and filesize interval = setInterval(async () => { try { helpers.logger.info(`checkIfAborted()`) const isAborted = await checkIfAborted(recordId) if (isAborted) { abortController.abort() } let state: RecordingState = 'recording' } catch (e) { helpers.logger.error(`error while checking if this job was aborted. For sake of the recording in progress we are ignoring the following error. ${e}`) } }, 30000) // start recording and await the S3 upload being finished await record.start() } finally { clearInterval(interval) } // const recordId = await createRecordingRecord(payload, helpers) // const { url } = payload; // console.log(`@todo simulated start_recording with url=${url}, recordId=${recordId}`) // await helpers.addJob('record', { url, recordId }) } export default record