capture basic function

This commit is contained in:
CJ_Clippy 2024-09-20 19:01:21 -08:00
parent d89bf4fdea
commit 92cce429b3
33 changed files with 550 additions and 344 deletions

View File

@ -238,12 +238,7 @@ docker_build(
load('ext://uibutton', 'cmd_button') load('ext://uibutton', 'cmd_button')
cmd_button('postgres:create',
argv=['./scripts/postgres-create.sh'],
resource='postgresql-primary',
icon_name='dataset',
text='create (empty) databases',
)
cmd_button('postgres:restore', cmd_button('postgres:restore',
argv=['./scripts/postgres-restore.sh'], argv=['./scripts/postgres-restore.sh'],
resource='postgresql-primary', resource='postgresql-primary',
@ -257,18 +252,19 @@ cmd_button('postgres:drop',
text='DROP all databases' text='DROP all databases'
) )
cmd_button('postgres:refresh', cmd_button('postgres:refresh',
argv=['sh', './scripts/postgres-refresh.sh'], argv=['echo', '@todo please restart postgrest container manually.'],
resource='migrations', resource='migrations',
icon_name='refresh', icon_name='refresh',
text='Refresh schema cache' text='Refresh schema cache'
) )
# cmd_button('capture-api:create', ## @todo let's make this get a random room from scout then use the random room to record via POST /recordings
# argv=['http', '--ignore-stdin', 'POST', 'http://localhost:5003/api/record', "url='https://twitch.tv/ironmouse'", "channel='ironmouse'"], cmd_button('capture-worker:create',
# resource='capture-api', argv=['./scripts/capture-integration.sh'],
# icon_name='send', resource='capture-worker',
# text='Start Recording' icon_name='send',
# ) text='Recording Integration Test'
)
cmd_button('postgres:migrate', cmd_button('postgres:migrate',
argv=['./scripts/postgres-migrations.sh'], argv=['./scripts/postgres-migrations.sh'],

View File

@ -2,22 +2,6 @@
# ---
# apiVersion: v1
# kind: Service
# metadata:
# name: capture-api
# namespace: futureporn
# spec:
# type: ClusterIP
# selector:
# app.kubernetes.io/name: capture
# ports:
# - name: http
# port: {{ .Values.capture.api.port }}
# targetPort: http
# protocol: TCP
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
@ -87,48 +71,3 @@ spec:
memory: 1024Mi memory: 1024Mi
restartPolicy: Always restartPolicy: Always
# ---
# apiVersion: apps/v1
# kind: Deployment
# metadata:
# name: capture-api
# namespace: futureporn
# labels:
# app.kubernetes.io/name: capture
# spec:
# replicas: {{ .Values.capture.api.replicas }}
# selector:
# matchLabels:
# app: capture-api
# template:
# metadata:
# labels:
# app: capture-api
# spec:
# containers:
# - name: capture
# image: "{{ .Values.capture.imageName }}"
# ports:
# - name: http
# containerPort: {{ .Values.capture.api.port }}
# env:
# - name: FUNCTION
# value: api
# - name: HTTP_PROXY
# valueFrom:
# secretKeyRef:
# name: capture
# key: httpProxy
# - name: WORKER_CONNECTION_STRING
# valueFrom:
# secretKeyRef:
# name: capture
# key: workerConnectionString
# - name: PORT
# value: "{{ .Values.capture.api.port }}"
# resources:
# limits:
# cpu: 100m
# memory: 256Mi
# restartPolicy: Always

View File

@ -0,0 +1,29 @@
import { configs } from "./config.ts";
export default async function createRecording({ url, discordMessageId, date }: { url: string, discordMessageId?: string, date: Date }) {
if (!url) throw new Error('createRecording requires {string} url');
const payload = {
url,
discord_message_id: discordMessageId,
date: date.toISOString()
}
const res = await fetch(`${configs.postgrestUrl}/recordings`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(payload)
})
if (!res.ok) {
const body = await res.text()
const msg = `Failed to create Recording. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const data = await res.json() as { id: string }
return data.id
}

View File

@ -67,7 +67,7 @@ export async function createStream(): Promise<string|null> {
* *
*/ */
export default async function findOrCreateStream({ vtuberId, date, minutes = 15 }: { vtuberId: string, date: Date, minutes?: number }): Promise<string|null> { export default async function findOrCreateStream({ vtuberId, date, minutes = 15 }: { vtuberId: string, date: Date, minutes?: number }): Promise<string|null> {
console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${date.toISOString()}, minutes=${minutes}`) console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${new Date(date).toISOString()}, minutes=${minutes}`)
if (!vtuberId) throw new Error(`findOrCreateStream requires vruberId passed in the options argument.`); if (!vtuberId) throw new Error(`findOrCreateStream requires vruberId passed in the options argument.`);
if (!date) throw new Error(`findOrCreateStream requires date passed in the options argument.`); if (!date) throw new Error(`findOrCreateStream requires date passed in the options argument.`);
const gteDate = sub(date, { minutes }) const gteDate = sub(date, { minutes })

View File

@ -83,9 +83,9 @@ async function createVtuber(vtuber: Partial<VtuberRecord>): Promise<string> {
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VtuberResponse[] const json = await res.json() as VtuberResponse[]
console.info(`createVtuber with vtuber as follows`) // console.info(`createVtuber with vtuber as follows`)
console.info(vtuber) // console.info(vtuber)
console.info(json) // console.info(json)
const vtuberData = json[0] const vtuberData = json[0]
if (!vtuberData) throw new Error('failed to createVtuber') if (!vtuberData) throw new Error('failed to createVtuber')
return vtuberData.id return vtuberData.id
@ -97,6 +97,7 @@ export default async function findOrCreateVtuber(query: Partial<vTuberSearchQuer
const { url, name } = query const { url, name } = query
if (!url) throw new Error('findOrCreateVtuber was missing url which is required'); if (!url) throw new Error('findOrCreateVtuber was missing url which is required');
console.info(`findOrCreateVtuber. url=${url}, name=${name}`) console.info(`findOrCreateVtuber. url=${url}, name=${name}`)
new URL(url) // validate URL, throw if invalid
const foundVtuber = await findVtuber(query) const foundVtuber = await findVtuber(query)
if (!foundVtuber) { if (!foundVtuber) {

View File

@ -21,7 +21,5 @@ export default async function findVod({ vod_id, discord_message_id }: { vod_id?:
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VodResponse[] const json = await res.json() as VodResponse[]
// console.info(`vod results as follows.`)
// console.info(json)
return json?.at(0) || null return json?.at(0) || null
} }

View File

@ -0,0 +1,45 @@
import type { RecordingResponse } from "@futureporn/types";
import { configs } from "./config.ts";
export default async function getRecording(recordingId: string): Promise<RecordingResponse|null> {
if (!recordingId) throw new Error(`getRecording requires recordingId param, but it was missing.`);
const fetchUrl = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
const fetchOptions = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation'
}
}
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
console.error(msg)
throw new Error(msg)
}
const json = await res.json() as RecordingResponse[]
return json?.at(0) || null
}
export async function getRecordingRelatedToVod(vodId: string): Promise<RecordingResponse|null> {
if (!vodId) throw new Error(`getRecordingRelatedToVod requires vodId param, but it was missing.`);
const fetchUrl = `${configs.postgrestUrl}/recordings?select=*,vods!inner(id)&vods.id=eq.${vodId}`
const fetchOptions = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation'
}
}
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
console.error(msg)
throw new Error(msg)
}
const json = await res.json() as RecordingResponse[]
return json?.at(0) || null
}

View File

@ -1,8 +1,8 @@
import { configs } from "./config" import { configs } from "./config"
import type { VodResponse } from "@futureporn/types" import type { VodResponse } from "@futureporn/types"
export default async function getVod(vodId: string) { export default async function getVod(vodId: string): Promise<VodResponse|null> {
const url = `${configs.postgrestUrl}/vods?select=*,segments(*)&id=eq.${vodId}` const url = `${configs.postgrestUrl}/vods?select=*,segments(*),recording:recordings(is_aborted)&id=eq.${vodId}`
try { try {
const res = await fetch(url) const res = await fetch(url)
if (!res.ok) { if (!res.ok) {

View File

@ -0,0 +1,28 @@
import type { RecordingRecord } from "@futureporn/types";
import { configs } from "./config.ts";
export default async function patchRecording(recordingId: string, payload: Partial<RecordingRecord>): Promise<void> {
const url = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
const fetchOptions = {
method: 'PATCH',
headers: {
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Content-Type': 'application/json',
'Prefer': 'return=headers-only'
},
body: JSON.stringify(payload)
}
try {
const res = await fetch(url, fetchOptions)
if (!res.ok) {
const body = await res.json()
console.error(body)
throw new Error(`Problem during patchRecording. res.status=${res.status}, res.statusText=${res.statusText}`)
}
return
} catch (e) {
console.error(e)
throw e
}
}

View File

@ -21,7 +21,7 @@ export default async function updateSegmentInDatabase({
bytes: fileSize bytes: fileSize
} }
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)` const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))`
console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`) console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
const res = await fetch(fetchUrl, { const res = await fetch(fetchUrl, {
method: 'PATCH', method: 'PATCH',

View File

@ -120,6 +120,7 @@ export interface VodRecord {
url: string; url: string;
discord_message_id: string; discord_message_id: string;
s3_file: string; s3_file: string;
recording_id: string;
} }
export interface TagRecord { export interface TagRecord {
@ -134,6 +135,15 @@ export interface User {
id: string id: string
} }
export interface RecordingResponse {
id: string;
url: string;
date: string;
discord_interaction_id: string;
is_aborted: boolean;
vod_id: string;
}
export interface VodResponse { export interface VodResponse {
id: string; id: string;
stream: StreamResponse; stream: StreamResponse;
@ -157,9 +167,9 @@ export interface VodResponse {
note?: string; note?: string;
url: string; url: string;
segments?: SegmentResponse[]; segments?: SegmentResponse[];
recording: RecordingRecord;
status: Status; status: Status;
discord_message_id: string; discord_message_id: string;
is_recording_aborted: boolean;
} }
@ -177,7 +187,6 @@ export interface StreamRecord {
archive_status: ArchiveStatus; archive_status: ArchiveStatus;
is_chaturbate_stream: Boolean; is_chaturbate_stream: Boolean;
is_fansly_stream: Boolean; is_fansly_stream: Boolean;
is_recording_aborted: Boolean;
status: Status; status: Status;
segments?: SegmentResponse[] segments?: SegmentResponse[]
} }
@ -203,14 +212,15 @@ export interface StreamResponse {
export interface RecordingRecord { export interface RecordingRecord {
id: number; id: number;
recording_state: RecordingState;
file_size: number; file_size: number;
discord_message_id: string; discord_message_id: string;
is_recording_aborted: boolean; is_aborted: boolean;
vod_id: string;
updated_at: Date; updated_at: Date;
created_at: Date; created_at: Date;
} }
export interface SegmentResponse { export interface SegmentResponse {
id: number; id: number;
s3_key: string; s3_key: string;

View File

@ -30,7 +30,7 @@ describe('image', function () {
describe('getStoryboard', function () { describe('getStoryboard', function () {
this.timeout(1000*60*15) this.timeout(1000*60*15)
it('should accept a URL and return a path to image on disk', async function () { it('should accept a URL and return a path to image on disk', async function () {
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-15.mp4' const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-19.mp4'
const imagePath = await getStoryboard(url) const imagePath = await getStoryboard(url)
expect(imagePath).to.match(/\.png/) expect(imagePath).to.match(/\.png/)
}) })

25
scripts/capture-integration.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
if [ -z "${AUTOMATION_USER_JWT}" ]; then
echo "Error: AUTOMATION_USER_JWT variable is not defined."
exit 1
fi
# get a random room
response=$(curl -sL --fail GET http://localhost:8134/chaturbate/random-room)
exitcode=$?
url=$(echo $response | jq -r '.url')
if [[ $exitcode -ne 0 || -z "$response" || -z "$url" ]]; then
echo "failed to get random room. exitcode=${exitcode}, response=${response}, url=${url}"
exit $exitcode
fi
echo "Random online chaturbate room url=${url}"
# create a recording
curl -sL -H "Authorization: Bearer ${AUTOMATION_USER_JWT}" \
-H "Content-Type: application/json" \
-d '{"url": "'"${url}"'"}' \
http://localhost:9000/recordings
echo "finished creating recording"

View File

@ -16,7 +16,7 @@ createCommand({
if (!message) return bot.logger.error('interaction.message was missing'); if (!message) return bot.logger.error('interaction.message was missing');
if (!message.id) return bot.logger.error(`interaction.message.id was missing`); if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
const url = `${configs.postgrestUrl}/vods?discord_message_id=eq.${message.id}`; const url = `${configs.postgrestUrl}/recordings?discord_message_id=eq.${message.id}`;
const options = { const options = {
method: 'PATCH', method: 'PATCH',
headers: { headers: {
@ -26,7 +26,7 @@ createCommand({
'Authorization': `Bearer ${configs.automationUserJwt}` 'Authorization': `Bearer ${configs.automationUserJwt}`
}, },
body: JSON.stringify({ body: JSON.stringify({
is_recording_aborted: true, is_aborted: true,
status: 'aborted' as Status status: 'aborted' as Status
}) })
}; };

View File

@ -13,6 +13,7 @@ import type { StreamResponse } from '@futureporn/types'
import createVod from '@futureporn/fetchers/createVod.ts' import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts' import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts' import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
import createRecording from '@futureporn/fetchers/createRecording.ts'
/** /**
@ -114,13 +115,10 @@ createCommand({
} }
const discord_message_id = message.id.toString() const discord_message_id = message.id.toString()
const discordMessageId = discord_message_id
const date = new Date() const date = new Date()
const vtuberId = await findOrCreateVtuber({ url })
const streamId = await findOrCreateStream({ vtuberId, date }) await createRecording({ url, discordMessageId, date })
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, discord_message_id, date: date.toISOString() })
if (!vod) throw new Error('failed to createVod. please try again.')
logger.info(`Success! We have created VOD id=${vod.id}`)
} catch (e) { } catch (e) {
const message = `Record failed due to the following error.\n${e}` const message = `Record failed due to the following error.\n${e}`

View File

@ -1,5 +1,13 @@
Task names uses underscores because graphile_worker expects them to be that way because graphile_worker interfaces with Postgresql which uses lowercase and numberscores. Task names uses underscores because graphile_worker expects them to be that way because graphile_worker interfaces with Postgresql which uses lowercase and numberscores.
here are some administrative functions for clearing all tasks. Also see https://worker.graphile.org/docs/admin-functions
(search tags as follows because I keep losing this file)
administrative tasks
clear all
delete all
jobs
addJob()
## Add job via SQL ## Add job via SQL

View File

@ -199,6 +199,34 @@ importers:
specifier: ^5.5.4 specifier: ^5.5.4
version: 5.5.4 version: 5.5.4
../..: {}
../../packages/fetchers: {}
../../packages/infra: {}
../../packages/storage: {}
../../packages/types: {}
../../packages/utils: {}
../bot: {}
../factory: {}
../mailbox: {}
../migrations: {}
../next: {}
../scout: {}
../strapi: {}
../uppy: {}
packages: packages:
'@aws-crypto/crc32@5.2.0': '@aws-crypto/crc32@5.2.0':

View File

@ -8,7 +8,7 @@
import { VodResponse } from "@futureporn/types" import { VodResponse } from "@futureporn/types"
import getVod from '@futureporn/fetchers/getVod.ts' import getVod from '@futureporn/fetchers/getVod.ts'
import { Duplex, PassThrough, Readable, type Writable } from "stream" import { PassThrough, Readable, type Writable } from "stream"
import { tmpdir } from 'node:os' import { tmpdir } from 'node:os'
import { join } from 'node:path' import { join } from 'node:path'
import { ua0 } from '@futureporn/utils/name.ts' import { ua0 } from '@futureporn/utils/name.ts'
@ -20,6 +20,7 @@ import { Upload, type Progress } from "@aws-sdk/lib-storage"
import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3' import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
import prettyBytes from "pretty-bytes" import prettyBytes from "pretty-bytes"
import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts" import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts"
import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts"
import createSegmentInDatabase from "@futureporn/fetchers/createSegmentInDatabase.ts" import createSegmentInDatabase from "@futureporn/fetchers/createSegmentInDatabase.ts"
import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts" import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts"
import { createReadStream, createWriteStream } from "fs" import { createReadStream, createWriteStream } from "fs"
@ -97,6 +98,9 @@ export default class RecordNextGeneration {
private uploadCounter: number; private uploadCounter: number;
private downloadCounter: number; private downloadCounter: number;
private databaseUpdateTimer?: NodeJS.Timeout; private databaseUpdateTimer?: NodeJS.Timeout;
private updateTimeout: number;
private abortController: AbortController;
constructor({ vodId, playlistUrl }: RecordNextGenerationArguments) { constructor({ vodId, playlistUrl }: RecordNextGenerationArguments) {
@ -104,13 +108,25 @@ export default class RecordNextGeneration {
this.playlistUrl = playlistUrl this.playlistUrl = playlistUrl
this.uploadCounter = 0 this.uploadCounter = 0
this.downloadCounter = 0 this.downloadCounter = 0
this.updateTimeout = 30*1000
this.abortController = new AbortController()
// const outputStream = createWriteStream('/dev/null') this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this))
// setInterval(() => { inputStream.push('simulated downloader bytes received') }, 50)
// setTimeout(() => { inputStream.destroy() })
} }
abortEventListener() {
console.log(`abortEventListener has been invoked. this.abortSignal is as follows`)
console.log(this.abortController.signal)
console.log(JSON.stringify(this.abortController.signal, null, 2))
const reason = this.abortController.signal.reason
if (this.downloadStream) {
console.log(`aborted the stream download with reason=${reason}`)
this.downloadStream.destroy(new AdminAbortedError())
} else {
console.warn(`downloadStream does not exist. Perhaps it has already been aborted?`)
}
}
getMultipartUpload({ getMultipartUpload({
client, client,
bucket, bucket,
@ -138,14 +154,13 @@ export default class RecordNextGeneration {
params params
}) })
/** /**
* aws client docs recommend against using async onProgress handlers. * aws client docs recommend against using async onProgress handlers.
* therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere. * therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere.
*/ */
const onProgress = (progress: Progress) => { const onProgress = (progress: Progress) => {
if (progress?.loaded) { if (progress?.loaded) {
console.log(`Progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`) console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
this.reportMemoryUsage() this.reportMemoryUsage()
this.uploadCounter = progress.loaded this.uploadCounter = progress.loaded
} }
@ -155,55 +170,23 @@ export default class RecordNextGeneration {
return upload return upload
} }
// static deleteme () {
// // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA. // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
// // when the download stream closes before the upload stream, I think the upload stream gets cut off. // when the download stream closes before the upload stream, I think the upload stream gets cut off.
// // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3. // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
// // is that right? IDK what's happening, but we don't get any segment data on S3 at all?? // is that right? IDK what's happening, but we don't get any segment data on S3 at all??
// // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
// // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far. // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
// // So I think what is happening is the upload is happening, but it's not finishing. // So I think what is happening is the upload is happening, but it's not finishing.
// // It looks like the finish is only allowed to happen under completely normal circumstances. // It looks like the finish is only allowed to happen under completely normal circumstances.
// // However, the segment upload may fail in production, and we need to let the upload finish even then. // However, the segment upload may fail in production, and we need to let the upload finish even then.
// // //
// // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
// // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput. // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
// // But because of the catch, that promise will never resolve? // But because of the catch, that promise will never resolve?
// // What happens to an in-progress Promise when an error is thrown? // What happens to an in-progress Promise when an error is thrown?
//
// maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694 // maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694
// // await this.upload.done();
// // console.log('Upload is complete.')
// // return this.uploadStream
// } catch (e) {
// // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error.
// // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted.
// // in other words, `(e.name === 'AbortError')` will never be true.
// if (this.abortSignal.aborted) {
// console.error('While uploading, the upload was aborted.')
// setTimeout(async () => {
// await this.upload?.abort()
// }, 1000)
// // if (this.upload) {
// // const command = new CompleteMultipartUploadCommand()
// // }
// return;
// }
// if (e instanceof Error) {
// console.error(`We were uploading a file to S3 but then we encountered an exception!`)
// console.error(e)
// throw e
// } else {
// throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
// }
// }
// }
@ -296,8 +279,9 @@ export default class RecordNextGeneration {
*/ */
async getDatabaseRecords() { async getDatabaseRecords() {
this.vod = await getVod(this.vodId) this.vod = await getVod(this.vodId)
if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`) if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
if (this.vod.is_recording_aborted) throw new AdminAbortedError(); if (this.vod.recording.is_aborted) throw new AdminAbortedError();
if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.'); if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
this.segmentId = await createSegmentInDatabase(this.s3Key, this.vodId) this.segmentId = await createSegmentInDatabase(this.s3Key, this.vodId)
@ -355,97 +339,121 @@ export default class RecordNextGeneration {
await this.uploadInstance.done() await this.uploadInstance.done()
} }
startProgressReports() {
/** /**
* occasionalDatabaseRecordUpdater * # handleExceptions
* *
* We need to update the segment database record so the admin UI shows the progress of the recording. * We want to handle any exceptions that are thrown, so our process continues running.
* Here we use the byte count that was set by AWS S3 uploader to update the segment record. * Ideally we know every failure scenario and we handle it graceefully.
* Then, we queue another update 1 minute from the completion of this function. * If we ever reach the default: case below, it's a bug and we need to patch it.
*/ */
const occasionalDatabaseRecordUpdater = async () => { handleExceptions (e: any, phase?: string) {
console.log(`occasionalDatabaseRecordUpdater() is running now. downloadCounter=${this.downloadCounter} (${prettyBytes(this.downloadCounter)}), uploadCounter=${this.uploadCounter} (${prettyBytes(this.uploadCounter)})`) console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`)
this.reportMemoryUsage()
if (this.segmentId) { if (e instanceof Error && e.name === 'RoomOfflineError') {
await updateSegmentInDatabase({ segment_id: this.segmentId, fileSize: this.downloadCounter }) // if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
} // we do this because the offline might be a temporary situation.
return setTimeout(occasionalDatabaseRecordUpdater, 60*1000) // e.g. streamer's computer bluescreened and they're coming back after they reboot.
throw e
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
throw e
} else if (e instanceof Error && e.message === 'no tomes available') {
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
} else if (e instanceof Error && e.name === 'UploadFailedError') {
throw e
} else {
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
} }
this.databaseUpdateTimer = setTimeout(occasionalDatabaseRecordUpdater, 60*1000)
} }
async updateDatabaseRecords() {
await this.updateSegmentBytes()
await this.checkForAborted()
}
async checkForAborted() {
const recording = await getRecordingRelatedToVod(this.vodId)
if (!recording) throw new Error(`failed to get recording related to vodId=${this.vodId}`);
if (recording.is_aborted) {
this.abortController.abort()
}
}
async updateSegmentBytes() {
console.log(`updateSegmentBytes() start with this.segmentId=${this.segmentId} this.downloadCounter=${this.downloadCounter}`)
if (this.segmentId) {
await updateSegmentInDatabase({ segment_id: this.segmentId, fileSize: this.downloadCounter })
}
}
startProgressReports() {
this.databaseUpdateTimer = setInterval(async () => {
try {
await this.updateDatabaseRecords()
} catch (e) {
console.error(`during startProgressReports(), we encountered the following error.`)
console.error(e)
}
}, this.updateTimeout)
}
stopProgressReports() { stopProgressReports() {
clearInterval(this.databaseUpdateTimer) clearInterval(this.databaseUpdateTimer)
} }
/** /**
* done() waits for the recording to be complete. * done() waits for the recording to be complete.
*/ */
async done() { async done() {
/**
* Errors thrown inside the setTimeout callback will end up as an uncaughtException.
* We handle those errors here to prevent node from exiting.
*/
process.on('uncaughtException', (e) => {
console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
console.log(e)
process.exit(69)
})
try { try {
this.startProgressReports() this.startProgressReports()
await this.download() await this.download()
} catch (e) { } catch (e) {
return this.handleExceptions(e, 'download')
switch (e) {
case (e instanceof Error && e.name === 'RoomOfflineError'):
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
throw e
case (e instanceof Error && e.name === 'AdminAbortedError'):
// An admin aborted which means we don't want to retry.
// we return and the Task gets marked as successful.
return
case (e instanceof Error && e.name === 'DownloadFailedError'):
throw e
default:
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
}
} finally { } finally {
console.info(`🏁 finally block (👇 download phase)`)
// download is done, so we upload the segment to S3. // download is done, so we upload the segment to S3.
try { try {
await this.upload() await this.upload()
} catch (e) { } catch (e) {
return this.handleExceptions(e, 'upload')
switch (e) {
case (e instanceof Error && e.message === 'no tomes available'):
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
case (e instanceof Error && e.name === 'UploadFailedError'):
throw e
default:
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
}
} finally { } finally {
// @todo - [ ] send S3 complete upload command if necessary // @todo - [ ] send S3 complete upload command if necessary
console.info(`finally block. Cleaning up.`) console.info(`🏁 finally block (👆 upload phase)`)
this.stopProgressReports() this.stopProgressReports()
} }
} }

View File

@ -0,0 +1,28 @@
import { setInterval, setTimeout } from 'node:timers/promises';
async function task() {
console.log('Task is running:', new Date().toISOString());
await setTimeout(1000); // Simulating async work with 1 second delay
console.log('Task completed:', new Date().toISOString());
}
async function main() {
console.log('Main function started');
// Run the async function on an interval
(async () => {
for await (const _ of setInterval(5000)) { // 5 seconds interval
task(); // Running async task without waiting for it to complete
}
})();
// Non-blocking logic in the main function with a for loop
for (let i = 1; i <= 5; i++) {
console.log(`Main function loop iteration ${i}`);
await setTimeout(2000); // Simulate async work (non-blocking) with 2 seconds delay
}
console.log('Main function loop completed');
}
main();

View File

@ -1,97 +1,49 @@
import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts'
import { Helpers, type Task } from 'graphile-worker' import { Helpers, type Task } from 'graphile-worker'
import Record from '../Record.ts'
import type { SegmentResponse } from '@futureporn/types'
import { configs } from '../config.ts'
import { createId } from '@paralleldrive/cuid2'
import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts'
import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts'
import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts' import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts'
import getVod from '@futureporn/fetchers/getVod.ts'
import RecordNextGeneration from '../RecordNextGeneration.ts' import RecordNextGeneration from '../RecordNextGeneration.ts'
import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
import getRecording from '@futureporn/fetchers/getRecording.ts'
import patchRecording from '@futureporn/fetchers/patchRecording.ts'
/**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
* recordId is the ID of the record record in postgres
* we use the ID to poll the db to see if the job is aborted by the user
*/
interface Payload { interface Payload {
url: string; recording_id: string;
vod_id: string; // url: string;
// discord_message_id?: string;
// vod_id?: string;
// date?: string;
} }
function assertPayload(payload: any): asserts payload is Payload { function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.url !== "string") throw new Error("invalid url"); if (typeof payload.recording_id !== "string") throw new Error("invalid recording_id");
if (typeof payload.vod_id !== "string") throw new Error(`invalid vod_id=${payload.vod_id}`); }
async function assertVod(payload: Payload) {
let { recording_id } = payload
const recording = await getRecording(recording_id)
if (!recording) throw new Error(`failed to getRecording id=${recording_id}`);
let { url, vod_id, date } = recording
if (vod_id) return { vodId: vod_id };
if (!date) date = new Date().toISOString();
const vtuberId = await findOrCreateVtuber({ url })
const streamId = await findOrCreateStream({ vtuberId, date: new Date(date) })
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, date, recording_id })
if (!vod) throw new Error('failed to createVod. please try again.')
return { vodId: vod.id }
} }
// async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
// helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
// const abortController = new AbortController()
// const abortSignal = abortController.signal
// const accessKeyId = configs.s3AccessKeyId;
// const secretAccessKey = configs.s3SecretAccessKey;
// const region = configs.s3Region;
// const endpoint = configs.s3Endpoint;
// const bucket = configs.s3UscBucket;
// const playlistUrl = await getPlaylistUrl(url)
// if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
// helpers.logger.info(`playlistUrl=${playlistUrl}`)
// const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
// const inputStream = Record.getFFmpegStream({ url: playlistUrl })
// const onProgress = (fileSize: number) => {
// updateSegmentInDatabase({ segment_id, fileSize, helpers })
// .then(checkIfAborted)
// .then((isAborted) => {
// isAborted ? abortController.abort() : null
// })
// .catch((e) => {
// helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance')
// helpers.logger.error(e)
// })
// }
// const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal })
// return record
// }
// function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
// return (!!segment?.vod?.is_recording_aborted)
// }
/**
* # doRecordSegment
*
* Record a segment of a livestream using ffmpeg.
*
* Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can.
*
* This function creates a new segments and vods_segments_links entry in the db via Postgrest REST API.
*
* This function also names the S3 file (s3_key) with a datestamp and a cuid.
*/
// const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
// const s3_key = `${new Date().toISOString()}-${createId()}.ts`
// helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
// const segment_id = await createSegmentInDatabase(s3_key, vod_id)
// helpers.logger.info(`let's create a segmentsStreamLink...`)
// const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id)
// helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
// // no try-catch block here, because we need any errors to bubble up.
// const record = await getRecordInstance(url, segment_id)
// helpers.logger.info(`we got a Record instance. now we record.start()`)
// // console.log(record)
// return record.start()
// }
@ -99,11 +51,15 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
assertPayload(payload) assertPayload(payload)
const { url, vod_id } = payload const { recording_id: recordingId } = payload
const vodId = vod_id const recording = await getRecording(recordingId)
if (!recording) throw new Error(`failed to getRecording() ${recordingId}`);
const { url } = recording
const { vodId } = await assertVod(payload)
// await patchRecording(recording.id, { vod_id: vodId })
const playlistUrl = await getPlaylistUrl(url) const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`) if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`);
/** /**
* RecordNextGeneration handles errors for us and re-throws ones that should fail the Task. * RecordNextGeneration handles errors for us and re-throws ones that should fail the Task.
@ -115,49 +71,6 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
return; return;
// try {
// // if the VOD has been aborted, end Task with success
// if ((await getVod(vod_id, helpers))?.is_recording_aborted) return;
// /**
// * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
// * This will take effect only when Record() throws an error.
// * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry.
// * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops.
// *
// * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
// */
// // await backOff(() => doRecordSegment(url, recordId, helpers))
// await doRecordSegment(url, vodId, helpers)
// } catch (e) {
// // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' })
// helpers.logger.error(`caught an error during record Task`)
// if (e instanceof Error) {
// helpers.logger.info(`error.name=${e.name}`)
// if (e.name === 'RoomOfflineError') {
// // If room is offline, we want to retry until graphile-worker retries expire.
// // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
// // graphile-worker will retry when we re-throw the error below.
// helpers.logger.info(`Room is offline.`)
// } else if (e.name === 'AbortError') {
// // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
// // We swallow the error and return in order to mark the job as succeeded.
// helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
// return
// } else {
// helpers.logger.error(e.message)
// }
// } else {
// helpers.logger.error(JSON.stringify(e))
// }
// // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
// helpers.logger.error(`we got an error during record Task so we throw and retry`)
// throw e
// }
// helpers.logger.info('record Task has finished')
} }

View File

@ -3,3 +3,9 @@
Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail) Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail)
factory has a big disk and lots of RAM in order to do transcoding tasks factory has a big disk and lots of RAM in order to do transcoding tasks
## 240p encodes
ffmpeg -i ./pmel-cb-2023-03-04.mp4 -vf scale=-2:240 -b:v 368k -b:a 45k ./projektmelody-chaturbatep2023-03-04_240p.mp4

View File

@ -0,0 +1,35 @@
-- recordings table schema
CREATE TABLE api.recordings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
url TEXT NOT NULL,
created_at timestamp(6) without time zone,
updated_at timestamp(6) without time zone,
discord_message_id TEXT
);
-- roles & permissions for our backend automation user
GRANT all ON api.recordings TO automation;
GRANT SELECT ON api.recordings TO web_anon;
-- we re-create this function to use recording_id instead of vod_id
DROP FUNCTION public.tg__add_record_job CASCADE;
CREATE FUNCTION public.tg__add_record_job() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
AS $$
begin
PERFORM graphile_worker.add_job('record', json_build_object(
'url', NEW.url,
'recording_id', NEW.id
), max_attempts := 6);
return NEW;
end;
$$;
CREATE TRIGGER create_recording
AFTER UPDATE ON api.recordings
FOR EACH ROW
EXECUTE FUNCTION tg__add_record_job();

View File

@ -0,0 +1,6 @@
ALTER TABLE IF EXISTS api.recordings
ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE IF EXISTS api.recordings
ALTER COLUMN updated_at SET DEFAULT now();

View File

@ -0,0 +1,8 @@
-- create discord_interactions table
CREATE TABLE api.discord_interactions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
discord_message_id TEXT NOT NULL
);
GRANT all ON api.discord_interactions TO automation;
GRANT SELECT ON api.discord_interactions TO web_anon;

View File

@ -0,0 +1,14 @@
-- add more cols to api.recordings
ALTER TABLE api.recordings
ADD COLUMN date TIMESTAMP(6) WITHOUT TIME ZONE;
ALTER TABLE api.recordings
ADD COLUMN vod_id UUID REFERENCES api.vods(id);
ALTER TABLE api.recordings
DROP COLUMN discord_message_id;
ALTER TABLE api.recordings
ADD COLUMN discord_interaction_id UUID REFERENCES api.discord_interactions(id);

View File

@ -0,0 +1,8 @@
DROP TRIGGER create_recording ON api.recordings;
CREATE TRIGGER recording_create
AFTER INSERT ON api.recordings
FOR EACH ROW
EXECUTE PROCEDURE public.tg__add_record_job('record');

View File

@ -0,0 +1,6 @@
-- move is_aborted COL from vods to recordings
ALTER TABLE api.vods
DROP COLUMN is_recording_aborted;
ALTER TABLE api.recordings
ADD COLUMN is_aborted BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN segments UUID REFERENCES api.segments(id);

View File

@ -0,0 +1,29 @@
/*
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
{
"code": "PGRST201",
"details": [
{
"cardinality": "one-to-many",
"embedding": "segments with vods",
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
},
{
"cardinality": "many-to-one",
"embedding": "segments with vods",
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
}
],
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
}
*/
ALTER TABLE api.vods
DROP COLUMN segments;

View File

@ -0,0 +1,27 @@
/*
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
{
"code": "PGRST201",
"details": [
{
"cardinality": "one-to-many",
"embedding": "segments with vods",
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
},
{
"cardinality": "many-to-one",
"embedding": "segments with vods",
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
}
],
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
}
*/
ALTER TABLE api.recordings
DROP COLUMN vod_id;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.recordings
ADD COLUMN vod_id UUID REFERENCES api.vods(id);

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN recording_id UUID REFERENCES api.recordings(id);

View File

@ -0,0 +1,4 @@
-- potentially unecessary fk, because api.vods has the relation too.
ALTER TABLE api.recordings
DROP COLUMN vod_id;