Compare commits

...

2 Commits

Author SHA1 Message Date
CJ_Clippy 5af878c3a3 refactor for simpler controlflow
ci / build (push) Failing after 1s Details
2024-09-25 18:01:25 -08:00
CJ_Clippy 92cce429b3 capture basic function 2024-09-20 19:01:21 -08:00
36 changed files with 882 additions and 462 deletions

View File

@ -238,12 +238,7 @@ docker_build(
load('ext://uibutton', 'cmd_button') load('ext://uibutton', 'cmd_button')
cmd_button('postgres:create',
argv=['./scripts/postgres-create.sh'],
resource='postgresql-primary',
icon_name='dataset',
text='create (empty) databases',
)
cmd_button('postgres:restore', cmd_button('postgres:restore',
argv=['./scripts/postgres-restore.sh'], argv=['./scripts/postgres-restore.sh'],
resource='postgresql-primary', resource='postgresql-primary',
@ -257,18 +252,19 @@ cmd_button('postgres:drop',
text='DROP all databases' text='DROP all databases'
) )
cmd_button('postgres:refresh', cmd_button('postgres:refresh',
argv=['sh', './scripts/postgres-refresh.sh'], argv=['echo', '@todo please restart postgrest container manually.'],
resource='migrations', resource='migrations',
icon_name='refresh', icon_name='refresh',
text='Refresh schema cache' text='Refresh schema cache'
) )
# cmd_button('capture-api:create', ## @todo let's make this get a random room from scout then use the random room to record via POST /recordings
# argv=['http', '--ignore-stdin', 'POST', 'http://localhost:5003/api/record', "url='https://twitch.tv/ironmouse'", "channel='ironmouse'"], cmd_button('capture-worker:create',
# resource='capture-api', argv=['./scripts/capture-integration.sh'],
# icon_name='send', resource='capture-worker',
# text='Start Recording' icon_name='send',
# ) text='Recording Integration Test'
)
cmd_button('postgres:migrate', cmd_button('postgres:migrate',
argv=['./scripts/postgres-migrations.sh'], argv=['./scripts/postgres-migrations.sh'],
@ -536,6 +532,10 @@ k8s_resource(
labels=['backend'], labels=['backend'],
resource_deps=['postgrest', 'postgresql-primary'], resource_deps=['postgrest', 'postgresql-primary'],
) )
k8s_resource(
workload='chihaya',
labels=['backend']
)
k8s_resource( k8s_resource(
workload='postgrest', workload='postgrest',
port_forwards=['9000'], port_forwards=['9000'],

View File

@ -2,22 +2,6 @@
# ---
# apiVersion: v1
# kind: Service
# metadata:
# name: capture-api
# namespace: futureporn
# spec:
# type: ClusterIP
# selector:
# app.kubernetes.io/name: capture
# ports:
# - name: http
# port: {{ .Values.capture.api.port }}
# targetPort: http
# protocol: TCP
--- ---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
@ -87,48 +71,3 @@ spec:
memory: 1024Mi memory: 1024Mi
restartPolicy: Always restartPolicy: Always
# ---
# apiVersion: apps/v1
# kind: Deployment
# metadata:
# name: capture-api
# namespace: futureporn
# labels:
# app.kubernetes.io/name: capture
# spec:
# replicas: {{ .Values.capture.api.replicas }}
# selector:
# matchLabels:
# app: capture-api
# template:
# metadata:
# labels:
# app: capture-api
# spec:
# containers:
# - name: capture
# image: "{{ .Values.capture.imageName }}"
# ports:
# - name: http
# containerPort: {{ .Values.capture.api.port }}
# env:
# - name: FUNCTION
# value: api
# - name: HTTP_PROXY
# valueFrom:
# secretKeyRef:
# name: capture
# key: httpProxy
# - name: WORKER_CONNECTION_STRING
# valueFrom:
# secretKeyRef:
# name: capture
# key: workerConnectionString
# - name: PORT
# value: "{{ .Values.capture.api.port }}"
# resources:
# limits:
# cpu: 100m
# memory: 256Mi
# restartPolicy: Always

View File

@ -0,0 +1,29 @@
import { configs } from "./config.ts";
export default async function createRecording({ url, discordMessageId, date }: { url: string, discordMessageId?: string, date: Date }) {
if (!url) throw new Error('createRecording requires {string} url');
const payload = {
url,
discord_message_id: discordMessageId,
date: date.toISOString()
}
const res = await fetch(`${configs.postgrestUrl}/recordings`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(payload)
})
if (!res.ok) {
const body = await res.text()
const msg = `Failed to create Recording. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const data = await res.json() as { id: string }
return data.id
}

View File

@ -0,0 +1,30 @@
import { SegmentResponse } from '@futureporn/types';
import { configs } from './config.ts'
import querystring from 'node:querystring'
export default async function createSegment(s3_key: string, vod_id: string): Promise<SegmentResponse> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key,
vod_id
}
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(segmentPayload)
})
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const data = await res.json() as SegmentResponse[]
if (!data[0]) throw new Error('failed to createSegment! body[0] was missing.');
return data[0]
}

View File

@ -1,37 +0,0 @@
import { configs } from './config.ts'
import querystring from 'node:querystring'
export default async function createSegmentInDatabase(s3_key: string, vod_id: string): Promise<string> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key,
vod_id
}
// helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
// helpers.logger.info(JSON.stringify(segmentPayload))
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=headers-only',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(segmentPayload)
})
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const location = res.headers.get('location')
if (!location) throw new Error(`failed to get location header in response from postgrest`);
const parsedQuery = querystring.parse(location)
const segmentsId = parsedQuery['/segments?id']
if (!segmentsId) throw new Error('segmentsId was undefined which is unexpected');
if (Array.isArray(segmentsId)) throw new Error('segmentsId was an array which is unexpected');
const id = segmentsId.split('.').at(-1)
if (!id) throw new Error('failed to get id ');
return id
}

View File

@ -67,7 +67,7 @@ export async function createStream(): Promise<string|null> {
* *
*/ */
export default async function findOrCreateStream({ vtuberId, date, minutes = 15 }: { vtuberId: string, date: Date, minutes?: number }): Promise<string|null> { export default async function findOrCreateStream({ vtuberId, date, minutes = 15 }: { vtuberId: string, date: Date, minutes?: number }): Promise<string|null> {
console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${date.toISOString()}, minutes=${minutes}`) console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${new Date(date).toISOString()}, minutes=${minutes}`)
if (!vtuberId) throw new Error(`findOrCreateStream requires vruberId passed in the options argument.`); if (!vtuberId) throw new Error(`findOrCreateStream requires vruberId passed in the options argument.`);
if (!date) throw new Error(`findOrCreateStream requires date passed in the options argument.`); if (!date) throw new Error(`findOrCreateStream requires date passed in the options argument.`);
const gteDate = sub(date, { minutes }) const gteDate = sub(date, { minutes })

View File

@ -83,9 +83,9 @@ async function createVtuber(vtuber: Partial<VtuberRecord>): Promise<string> {
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VtuberResponse[] const json = await res.json() as VtuberResponse[]
console.info(`createVtuber with vtuber as follows`) // console.info(`createVtuber with vtuber as follows`)
console.info(vtuber) // console.info(vtuber)
console.info(json) // console.info(json)
const vtuberData = json[0] const vtuberData = json[0]
if (!vtuberData) throw new Error('failed to createVtuber') if (!vtuberData) throw new Error('failed to createVtuber')
return vtuberData.id return vtuberData.id
@ -97,6 +97,7 @@ export default async function findOrCreateVtuber(query: Partial<vTuberSearchQuer
const { url, name } = query const { url, name } = query
if (!url) throw new Error('findOrCreateVtuber was missing url which is required'); if (!url) throw new Error('findOrCreateVtuber was missing url which is required');
console.info(`findOrCreateVtuber. url=${url}, name=${name}`) console.info(`findOrCreateVtuber. url=${url}, name=${name}`)
new URL(url) // validate URL, throw if invalid
const foundVtuber = await findVtuber(query) const foundVtuber = await findVtuber(query)
if (!foundVtuber) { if (!foundVtuber) {

View File

@ -21,7 +21,5 @@ export default async function findVod({ vod_id, discord_message_id }: { vod_id?:
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VodResponse[] const json = await res.json() as VodResponse[]
// console.info(`vod results as follows.`)
// console.info(json)
return json?.at(0) || null return json?.at(0) || null
} }

View File

@ -0,0 +1,45 @@
import type { RecordingResponse } from "@futureporn/types";
import { configs } from "./config.ts";
export default async function getRecording(recordingId: string): Promise<RecordingResponse|null> {
if (!recordingId) throw new Error(`getRecording requires recordingId param, but it was missing.`);
const fetchUrl = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
const fetchOptions = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation'
}
}
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
console.error(msg)
throw new Error(msg)
}
const json = await res.json() as RecordingResponse[]
return json?.at(0) || null
}
export async function getRecordingRelatedToVod(vodId: string): Promise<RecordingResponse|null> {
if (!vodId) throw new Error(`getRecordingRelatedToVod requires vodId param, but it was missing.`);
const fetchUrl = `${configs.postgrestUrl}/recordings?select=*,vods!inner(id)&vods.id=eq.${vodId}`
const fetchOptions = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation'
}
}
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
console.error(msg)
throw new Error(msg)
}
const json = await res.json() as RecordingResponse[]
return json?.at(0) || null
}

View File

@ -1,8 +1,8 @@
import { configs } from "./config" import { configs } from "./config"
import type { VodResponse } from "@futureporn/types" import type { VodResponse } from "@futureporn/types"
export default async function getVod(vodId: string) { export default async function getVod(vodId: string): Promise<VodResponse|null> {
const url = `${configs.postgrestUrl}/vods?select=*,segments(*)&id=eq.${vodId}` const url = `${configs.postgrestUrl}/vods?select=*,segments(*),recording:recordings(is_aborted)&id=eq.${vodId}`
try { try {
const res = await fetch(url) const res = await fetch(url)
if (!res.ok) { if (!res.ok) {

View File

@ -0,0 +1,28 @@
import type { RecordingRecord } from "@futureporn/types";
import { configs } from "./config.ts";
export default async function patchRecording(recordingId: string, payload: Partial<RecordingRecord>): Promise<void> {
const url = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
const fetchOptions = {
method: 'PATCH',
headers: {
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Content-Type': 'application/json',
'Prefer': 'return=headers-only'
},
body: JSON.stringify(payload)
}
try {
const res = await fetch(url, fetchOptions)
if (!res.ok) {
const body = await res.json()
console.error(body)
throw new Error(`Problem during patchRecording. res.status=${res.status}, res.statusText=${res.statusText}`)
}
return
} catch (e) {
console.error(e)
throw e
}
}

View File

@ -21,8 +21,8 @@ export default async function updateSegmentInDatabase({
bytes: fileSize bytes: fileSize
} }
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)` const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))`
console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`) // console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
const res = await fetch(fetchUrl, { const res = await fetch(fetchUrl, {
method: 'PATCH', method: 'PATCH',
headers: { headers: {

View File

@ -120,6 +120,7 @@ export interface VodRecord {
url: string; url: string;
discord_message_id: string; discord_message_id: string;
s3_file: string; s3_file: string;
recording_id: string;
} }
export interface TagRecord { export interface TagRecord {
@ -134,6 +135,15 @@ export interface User {
id: string id: string
} }
export interface RecordingResponse {
id: string;
url: string;
date: string;
discord_interaction_id: string;
is_aborted: boolean;
vod_id: string;
}
export interface VodResponse { export interface VodResponse {
id: string; id: string;
stream: StreamResponse; stream: StreamResponse;
@ -157,9 +167,9 @@ export interface VodResponse {
note?: string; note?: string;
url: string; url: string;
segments?: SegmentResponse[]; segments?: SegmentResponse[];
recording: RecordingRecord;
status: Status; status: Status;
discord_message_id: string; discord_message_id: string;
is_recording_aborted: boolean;
} }
@ -177,7 +187,6 @@ export interface StreamRecord {
archive_status: ArchiveStatus; archive_status: ArchiveStatus;
is_chaturbate_stream: Boolean; is_chaturbate_stream: Boolean;
is_fansly_stream: Boolean; is_fansly_stream: Boolean;
is_recording_aborted: Boolean;
status: Status; status: Status;
segments?: SegmentResponse[] segments?: SegmentResponse[]
} }
@ -203,16 +212,17 @@ export interface StreamResponse {
export interface RecordingRecord { export interface RecordingRecord {
id: number; id: number;
recording_state: RecordingState;
file_size: number; file_size: number;
discord_message_id: string; discord_message_id: string;
is_recording_aborted: boolean; is_aborted: boolean;
vod_id: string;
updated_at: Date; updated_at: Date;
created_at: Date; created_at: Date;
} }
export interface SegmentResponse { export interface SegmentResponse {
id: number; id: string;
s3_key: string; s3_key: string;
s3_id: string; s3_id: string;
bytes: number; bytes: number;

View File

@ -30,7 +30,7 @@ describe('image', function () {
describe('getStoryboard', function () { describe('getStoryboard', function () {
this.timeout(1000*60*15) this.timeout(1000*60*15)
it('should accept a URL and return a path to image on disk', async function () { it('should accept a URL and return a path to image on disk', async function () {
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-15.mp4' const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-25.mp4'
const imagePath = await getStoryboard(url) const imagePath = await getStoryboard(url)
expect(imagePath).to.match(/\.png/) expect(imagePath).to.match(/\.png/)
}) })

25
scripts/capture-integration.sh Executable file
View File

@ -0,0 +1,25 @@
#!/bin/bash
if [ -z "${AUTOMATION_USER_JWT}" ]; then
echo "Error: AUTOMATION_USER_JWT variable is not defined."
exit 1
fi
# get a random room
response=$(curl -sL --fail GET http://localhost:8134/chaturbate/random-room)
exitcode=$?
url=$(echo $response | jq -r '.url')
if [[ $exitcode -ne 0 || -z "$response" || -z "$url" ]]; then
echo "failed to get random room. exitcode=${exitcode}, response=${response}, url=${url}"
exit $exitcode
fi
echo "Random online chaturbate room url=${url}"
# create a recording
curl -sL -H "Authorization: Bearer ${AUTOMATION_USER_JWT}" \
-H "Content-Type: application/json" \
-d '{"url": "'"${url}"'"}' \
http://localhost:9000/recordings
echo "finished creating recording"

View File

@ -16,7 +16,7 @@ createCommand({
if (!message) return bot.logger.error('interaction.message was missing'); if (!message) return bot.logger.error('interaction.message was missing');
if (!message.id) return bot.logger.error(`interaction.message.id was missing`); if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
const url = `${configs.postgrestUrl}/vods?discord_message_id=eq.${message.id}`; const url = `${configs.postgrestUrl}/recordings?discord_message_id=eq.${message.id}`;
const options = { const options = {
method: 'PATCH', method: 'PATCH',
headers: { headers: {
@ -26,7 +26,7 @@ createCommand({
'Authorization': `Bearer ${configs.automationUserJwt}` 'Authorization': `Bearer ${configs.automationUserJwt}`
}, },
body: JSON.stringify({ body: JSON.stringify({
is_recording_aborted: true, is_aborted: true,
status: 'aborted' as Status status: 'aborted' as Status
}) })
}; };

View File

@ -13,6 +13,7 @@ import type { StreamResponse } from '@futureporn/types'
import createVod from '@futureporn/fetchers/createVod.ts' import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts' import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts' import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
import createRecording from '@futureporn/fetchers/createRecording.ts'
/** /**
@ -114,13 +115,10 @@ createCommand({
} }
const discord_message_id = message.id.toString() const discord_message_id = message.id.toString()
const discordMessageId = discord_message_id
const date = new Date() const date = new Date()
const vtuberId = await findOrCreateVtuber({ url })
const streamId = await findOrCreateStream({ vtuberId, date }) await createRecording({ url, discordMessageId, date })
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, discord_message_id, date: date.toISOString() })
if (!vod) throw new Error('failed to createVod. please try again.')
logger.info(`Success! We have created VOD id=${vod.id}`)
} catch (e) { } catch (e) {
const message = `Record failed due to the following error.\n${e}` const message = `Record failed due to the following error.\n${e}`

View File

@ -1,5 +1,13 @@
Task names uses underscores because graphile_worker expects them to be that way because graphile_worker interfaces with Postgresql which uses lowercase and numberscores. Task names uses underscores because graphile_worker expects them to be that way because graphile_worker interfaces with Postgresql which uses lowercase and numberscores.
here are some administrative functions for clearing all tasks. Also see https://worker.graphile.org/docs/admin-functions
(search tags, for easily finding this file by content)
administrative tasks
clear all
delete all
jobs
addJob()
## Add job via SQL ## Add job via SQL

View File

@ -199,6 +199,34 @@ importers:
specifier: ^5.5.4 specifier: ^5.5.4
version: 5.5.4 version: 5.5.4
../..: {}
../../packages/fetchers: {}
../../packages/infra: {}
../../packages/storage: {}
../../packages/types: {}
../../packages/utils: {}
../bot: {}
../factory: {}
../mailbox: {}
../migrations: {}
../next: {}
../scout: {}
../strapi: {}
../uppy: {}
packages: packages:
'@aws-crypto/crc32@5.2.0': '@aws-crypto/crc32@5.2.0':

View File

@ -1,14 +1,10 @@
/** /**
* RecordNextGeneration.ts * RecordNextGeneration.ts
*
* @important @todo There is a MEMORY LEAK in here somewhere!
* This causes OOMKiller to cull the capture process. Not good!
*
*/ */
import { VodResponse } from "@futureporn/types" import { VodResponse } from "@futureporn/types"
import getVod from '@futureporn/fetchers/getVod.ts' import getVod from '@futureporn/fetchers/getVod.ts'
import { Duplex, PassThrough, Readable, type Writable } from "stream" import { PassThrough, Readable, type Writable } from "stream"
import { tmpdir } from 'node:os' import { tmpdir } from 'node:os'
import { join } from 'node:path' import { join } from 'node:path'
import { ua0 } from '@futureporn/utils/name.ts' import { ua0 } from '@futureporn/utils/name.ts'
@ -20,13 +16,18 @@ import { Upload, type Progress } from "@aws-sdk/lib-storage"
import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3' import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
import prettyBytes from "pretty-bytes" import prettyBytes from "pretty-bytes"
import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts" import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts"
import createSegmentInDatabase from "@futureporn/fetchers/createSegmentInDatabase.ts" import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts"
import createSegment from "@futureporn/fetchers/createSegment.ts"
import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts" import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts"
import { createReadStream, createWriteStream } from "fs" import { createReadStream, createWriteStream } from "fs"
import pRetry, {AbortError} from 'p-retry'
import { type SegmentResponse } from '@futureporn/types'
import getPlaylistUrl from "@futureporn/fetchers/getPlaylistUrl.ts"
import { isBefore, sub } from 'date-fns'
export interface RecordNextGenerationArguments { export interface RecordNextGenerationArguments {
vodId: string; vodId: string;
playlistUrl: string; url: string;
} }
export class AdminAbortedError extends Error { export class AdminAbortedError extends Error {
@ -54,6 +55,20 @@ export class UploadFailedError extends Error {
} }
} }
export class PlaylistFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, PlaylistFailedError.prototype)
this.name = this.constructor.name
this.message = `PlaylistFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class DownloadFailedError extends Error { export class DownloadFailedError extends Error {
constructor(message?: string) { constructor(message?: string) {
super(message) super(message)
@ -68,22 +83,33 @@ export class DownloadFailedError extends Error {
/** /**
* RecordNextGeneration * # RecordNextGeneration
* *
* The function which records VODs in a Futureporn specific way. * The function which records VODs in a Futureporn specific, fault-tolerant way.
* *
* Issues * ## Issues/TODO list
* *
* @todo [x] onProgress stops firing * @todo [x] onProgress stops firing
* @todo [ ] OOMKilled seen via development environment * @todo [x] OOMKilled seen via development environment
* @todo [ ] undefined behavior during CB private shows * @todo [ ] undefined behavior during CB private shows
* @todo [ ] does not handle CB Hidden Camera ticket shows
* @todo [ ] Upload segments in a way that does not interrupt downloading new segments.
* There is an issue where a segment download ends, and the segment upload immediately begins.
* At first glance this looks like good behavior, but what is happening during the segment upload is that the livestream
* is continuing, but we aren't recording it anymore. We are using Backblaze, thus uploads are slow.
* We miss a lot of the stream because the upload takes many minutes.
* Instead of this behavior of immediately uploading after a segment download completes, we should upload once the livestream is finished,
* OR we should upload while concurrently downloading the next segment.
* @todo [ ] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context.
* There is an issue where the `record` task needs to retry after a temporary failure, but it cannot because there aren't any available workers.
* The solution is to not exit the `record` task at all, and instead keep the `record` task running, but suspended while a exponential backoff timer elapses.
* This way, the worker stays focused on the recording and retries until the stream has been offline for n minutes, at which point `record` is complete.
*
*/ */
export default class RecordNextGeneration { export default class RecordNextGeneration {
public vodId: string; public vodId: string;
public segmentId?: string; public url: string;
public segmentVodLinkId?: string;
public playlistUrl: string;
public s3Key?: string; public s3Key?: string;
public s3Bucket?: string; public s3Bucket?: string;
public s3Client?: S3Client; public s3Client?: S3Client;
@ -92,25 +118,52 @@ export default class RecordNextGeneration {
private downloadStream?: Readable; private downloadStream?: Readable;
private uploadStream?: PassThrough; private uploadStream?: PassThrough;
private uploadInstance?: Upload; private uploadInstance?: Upload;
private streamPipeline?: Promise<void>;
private diskStream?: Writable; private diskStream?: Writable;
private uploadCounter: number; private uploadCounter: number;
private downloadCounter: number; private downloadCounter: number;
private databaseUpdateTimer?: NodeJS.Timeout; private databaseUpdateTimer?: NodeJS.Timeout;
private updateTimeout: number;
private abortController: AbortController;
private segments: SegmentResponse[];
private retries: number;
constructor({ vodId, playlistUrl }: RecordNextGenerationArguments) { constructor({ vodId, url }: RecordNextGenerationArguments) {
this.vodId = vodId this.vodId = vodId
this.playlistUrl = playlistUrl this.url = url
this.uploadCounter = 0 this.uploadCounter = 0
this.downloadCounter = 0 this.downloadCounter = 0
this.updateTimeout = 30*1000
this.abortController = new AbortController()
// const outputStream = createWriteStream('/dev/null') this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this))
// setInterval(() => { inputStream.push('simulated downloader bytes received') }, 50) this.retries = 0
// setTimeout(() => { inputStream.destroy() }) this.segments = []
} }
async withRetry(fn: any, retries = 3) {
return pRetry(fn, {
onFailedAttempt: (e) => {
console.error(`Error during attempt:`, e);
},
retries
});
}
abortEventListener() {
console.log(`abortEventListener has been invoked. this.abortSignal is as follows`)
console.log(this.abortController.signal)
console.log(JSON.stringify(this.abortController.signal, null, 2))
const reason = this.abortController.signal.reason
if (this.downloadStream) {
console.log(`aborted the stream download with reason=${reason}`)
this.downloadStream.destroy(new AdminAbortedError())
} else {
console.warn(`downloadStream does not exist. Perhaps it has already been aborted?`)
}
}
getMultipartUpload({ getMultipartUpload({
client, client,
bucket, bucket,
@ -138,14 +191,13 @@ export default class RecordNextGeneration {
params params
}) })
/** /**
* aws client docs recommend against using async onProgress handlers. * aws client docs recommend against using async onProgress handlers.
* therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere. * therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere.
*/ */
const onProgress = (progress: Progress) => { const onProgress = (progress: Progress) => {
if (progress?.loaded) { if (progress?.loaded) {
console.log(`Progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`) console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
this.reportMemoryUsage() this.reportMemoryUsage()
this.uploadCounter = progress.loaded this.uploadCounter = progress.loaded
} }
@ -155,64 +207,35 @@ export default class RecordNextGeneration {
return upload return upload
} }
// static deleteme () {
// // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA. // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
// // when the download stream closes before the upload stream, I think the upload stream gets cut off. // when the download stream closes before the upload stream, I think the upload stream gets cut off.
// // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3. // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
// // is that right? IDK what's happening, but we don't get any segment data on S3 at all?? // is that right? IDK what's happening, but we don't get any segment data on S3 at all??
// // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
// // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far. // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
// // So I think what is happening is the upload is happening, but it's not finishing. // So I think what is happening is the upload is happening, but it's not finishing.
// // It looks like the finish is only allowed to happen under completely normal circumstances. // It looks like the finish is only allowed to happen under completely normal circumstances.
// // However, the segment upload may fail in production, and we need to let the upload finish even then. // However, the segment upload may fail in production, and we need to let the upload finish even then.
// // //
// // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
// // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput. // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
// // But because of the catch, that promise will never resolve? // But because of the catch, that promise will never resolve?
// // What happens to an in-progress Promise when an error is thrown? // What happens to an in-progress Promise when an error is thrown?
//
// maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694 // maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694
// // await this.upload.done();
// // console.log('Upload is complete.')
// // return this.uploadStream
// } catch (e) { static getDiskStream(s3Key?: string) {
// // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error. const tmpDiskPath = join(tmpdir(), s3Key || `${nanoid()}.ts`)
// // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted. return createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
// // in other words, `(e.name === 'AbortError')` will never be true. }
// if (this.abortSignal.aborted) {
// console.error('While uploading, the upload was aborted.')
// setTimeout(async () => {
// await this.upload?.abort()
// }, 1000)
// // if (this.upload) {
// // const command = new CompleteMultipartUploadCommand()
// // }
// return;
// }
// if (e instanceof Error) { static getFFmpegStream({ playlistUrl }: { playlistUrl: string }): Readable {
// console.error(`We were uploading a file to S3 but then we encountered an exception!`) console.log(`getFFmpegStream using playlistUrl=${playlistUrl}`)
// console.error(e)
// throw e
// } else {
// throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
// }
// }
// }
static getFFmpegStream({ url }: { url: string }): Readable {
console.log(`getFFmpegStream using url=${url}`)
const ffmpegProc = spawn('ffmpeg', [ const ffmpegProc = spawn('ffmpeg', [
'-headers', `"User-Agent: ${ua0}"`, '-headers', `"User-Agent: ${ua0}"`,
'-i', url, '-i', playlistUrl,
'-c:v', 'copy', '-c:v', 'copy',
'-c:a', 'copy', '-c:a', 'copy',
'-movflags', 'faststart', '-movflags', 'faststart',
@ -227,6 +250,14 @@ export default class RecordNextGeneration {
return ffmpegProc.stdout return ffmpegProc.stdout
} }
onUploadProgress(progress: Progress) {
if (progress?.loaded) {
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
this.reportMemoryUsage()
this.uploadCounter = progress.loaded
}
}
formatMemoryStats(stats: NodeJS.MemoryUsage): Record<string, string> { formatMemoryStats(stats: NodeJS.MemoryUsage): Record<string, string> {
const formattedStats: Record<string, string> = {}; const formattedStats: Record<string, string> = {};
@ -279,11 +310,10 @@ export default class RecordNextGeneration {
} }
getNames() { getNames() {
const tmpFileName = `${nanoid()}.ts` const s3Key = `${nanoid()}.ts`
this.s3Key = tmpFileName const tmpDiskPath = join(tmpdir(), s3Key)
this.tmpDiskPath = join(tmpdir(), tmpFileName) console.log(`tmpDiskPath=${tmpDiskPath}`)
console.log(`tmpDiskPath=${this.tmpDiskPath}`) return { tmpDiskPath, s3Key }
return { tmpDiskPath: this.tmpDiskPath, s3Key: this.s3Key }
} }
/** /**
@ -294,44 +324,60 @@ export default class RecordNextGeneration {
* * segment * * segment
* * segment_vod_link * * segment_vod_link
*/ */
async getDatabaseRecords() { // async getDatabaseRecords() {
this.vod = await getVod(this.vodId) // this.vod = await getVod(this.vodId)
if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`) // if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
if (this.vod.is_recording_aborted) throw new AdminAbortedError(); // if (this.vod.recording.is_aborted) throw new AdminAbortedError();
if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
this.segmentId = await createSegmentInDatabase(this.s3Key, this.vodId)
this.segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
if (!this.vod) throw new Error('after getRecords() ran, this.vod was missing.'); // if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
if (!this.segmentId) throw new Error('after getRecords() ran, this.segmentId was missing.'); // const segmentId = await createSegment(this.s3Key, this.vodId)
if (!this.segmentVodLinkId) throw new Error('after getRecords() ran, this.segmentVodLinkId was missing.'); // const segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
// if (!this.vod) throw new Error('after getDatabaseRecords() ran, this.vod was missing.');
// if (!segmentId) throw new Error('after getDatabaseRecords() ran, segmentId was missing.');
// if (!segmentVodLinkId) throw new Error('after getDatabaseRecords() ran, segmentVodLinkId was missing.');
// return { segmentId, segmentVodLinkId }
// }
static async _dl(url: string, s3Key: string) {
const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new PlaylistFailedError();
const ffmpegStream = RecordNextGeneration.getFFmpegStream({ playlistUrl })
const diskStream = RecordNextGeneration.getDiskStream(s3Key)
const streamPipeline = pipeline(ffmpegStream, diskStream)
return {
pipeline: streamPipeline,
ffmpegStream,
diskStream,
}
} }
async download() { static async _ul(client: S3Client, diskPath: string, key: string) {
const { tmpDiskPath } = this.getNames() const diskStream = createReadStream(diskPath, { encoding: 'utf-8' })
const s3Client = this.getS3Client()
await this.getDatabaseRecords()
this.downloadStream = RecordNextGeneration.getFFmpegStream({ url: this.playlistUrl }) const params = {
this.diskStream = createWriteStream(tmpDiskPath, { encoding: 'utf-8' }) Bucket: configs.s3UscBucket,
Key: key,
Body: diskStream
}
const uploadInstance = new Upload({
client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
// @see https://github.com/aws/aws-sdk-js-v3/issues/2311
// tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
// It should instead be interpreted as, 'throwOnPartsError'
leavePartsOnError: true,
params
})
this.streamPipeline = pipeline(this.downloadStream, this.diskStream) return {
uploadInstance,
this.downloadStream.on('data', (data: any) => this.downloadCounter += data.length) diskStream,
this.downloadStream.on('close', (arg: any) => console.log(`RecordNextGeneration downloadStream close. arg=${arg}`)) }
this.downloadStream.on('end', (arg: any) => console.log(`RecordNextGeneration downloadStream end. arg=${arg}`))
this.downloadStream.on('drain', (arg: any) => console.log(`RecordNextGeneration downloadStream drain. arg=${arg}`))
// this.downloadStream.on('pause', (arg: any) => console.log(`RecordNextGeneration downloadStream pause. arg=${arg}`))
this.downloadStream.on('error', (arg: any) => console.log(`RecordNextGeneration downloadStream error. arg=${arg}`))
this.diskStream.on('close', (arg: any) => console.log(`RecordNextGeneration diskStream close. arg=${arg}`))
this.diskStream.on('end', (arg: any) => console.log(`RecordNextGeneration diskStream end. arg=${arg}`))
// this.diskStream.on('drain', (arg: any) => console.log(`RecordNextGeneration diskStream drain. arg=${arg}`))
this.diskStream.on('pause', (arg: any) => console.log(`RecordNextGeneration diskStream pause. arg=${arg}`))
this.diskStream.on('error', (arg: any) => console.log(`RecordNextGeneration diskStream error. arg=${arg}`))
await this.streamPipeline
} }
async upload() { async upload() {
@ -343,112 +389,295 @@ export default class RecordNextGeneration {
if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()'); if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()');
const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' }) const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' })
// this.uploadStream = new PassThrough()
this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream }) this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream })
// this.uploadInstance.on('close', (arg: any) => console.log(`RecordNextGeneration uploadStream close. arg=${arg}`))
// this.uploadInstance.on('end', (arg: any) => console.log(`RecordNextGeneration uploadStream end. arg=${arg}`))
// this.uploadInstance.on('drain', (arg: any) => console.log(`RecordNextGeneration uploadStream drain. arg=${arg}`))
// this.uploadInstance.on('pause', (arg: any) => console.log(`RecordNextGeneration uploadStream pause. arg=${arg}`))
// this.uploadInstance.on('error', (arg: any) => console.log(`RecordNextGeneration uploadStream error. arg=${arg}`))
await this.uploadInstance.done() await this.uploadInstance.done()
} }
startProgressReports() {
/** /**
* occasionalDatabaseRecordUpdater * # handleExceptions
* *
* We need to update the segment database record so the admin UI shows the progress of the recording. * We want to handle any exceptions that are thrown, so our process continues running.
* Here we use the byte count that was set by AWS S3 uploader to update the segment record. * Ideally we know every failure scenario and we handle it graceefully.
* Then, we queue another update 1 minute from the completion of this function. * If we ever reach the default: case below, it's a bug and we need to patch it.
*/ */
const occasionalDatabaseRecordUpdater = async () => { handleExceptions (e: any, phase?: string) {
console.log(`occasionalDatabaseRecordUpdater() is running now. downloadCounter=${this.downloadCounter} (${prettyBytes(this.downloadCounter)}), uploadCounter=${this.uploadCounter} (${prettyBytes(this.uploadCounter)})`) console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`)
this.reportMemoryUsage()
if (this.segmentId) { if (e instanceof Error && e.name === 'RoomOfflineError') {
await updateSegmentInDatabase({ segment_id: this.segmentId, fileSize: this.downloadCounter }) // if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
} // we do this because the offline might be a temporary situation.
return setTimeout(occasionalDatabaseRecordUpdater, 60*1000) // e.g. streamer's computer bluescreened and they're coming back after they reboot.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
throw e
} else if (e instanceof Error && e.message === 'no tomes available') {
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
} else if (e instanceof Error && e.name === 'UploadFailedError') {
throw e
} else {
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
} }
this.databaseUpdateTimer = setTimeout(occasionalDatabaseRecordUpdater, 60*1000)
} }
async updateDatabaseRecords() {
await this.updateSegmentBytes()
await this.checkForAborted()
}
async checkForAborted() {
const recording = await getRecordingRelatedToVod(this.vodId)
if (!recording) throw new Error(`failed to get recording related to vodId=${this.vodId}`);
if (recording.is_aborted) {
this.abortController.abort()
}
}
async updateSegmentBytes() {
for (const [index, segment] of this.segments.entries()) {
if (segment.id) {
console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} segments.bytes=${segment.bytes}`)
await updateSegmentInDatabase({ segment_id: segment.id, fileSize: segment.bytes })
}
}
}
startProgressReports() {
this.databaseUpdateTimer = setInterval(async () => {
try {
await this.updateDatabaseRecords()
} catch (e) {
console.error(`during startProgressReports(), we encountered the following error.`)
console.error(e)
}
}, this.updateTimeout)
}
stopProgressReports() { stopProgressReports() {
clearInterval(this.databaseUpdateTimer) clearInterval(this.databaseUpdateTimer)
} }
/** /**
* done() waits for the recording to be complete. * isTryingDownload
*
* There are always more tries unless the stream has been offline for greater than 5 minutes.
*/
isTryingDownload() {
const isSegmentPresent = (this.segments && this.segments?.length > 0)
if (!isSegmentPresent) return true;
const latestSegment = this.segments.at(-1)
const hasUpdatedTimestamp = latestSegment?.updated_at
if (!hasUpdatedTimestamp) throw new Error('latestSegment does not have an updated_at property');
const fiveMinsAgo = sub(new Date(), { minutes: 5 })
const lastUpdatedAt = latestSegment.updated_at
return (isBefore(lastUpdatedAt, fiveMinsAgo)) ? true : false;
}
/**
* done()
*
* Repeatedly download segments until there is no more stream.
* Stream is considered no-more once it has been offline for >5 minutes.
* When downloading is done, upload segments to S3.
*
* input: stream URL, such as 'https://chaturbate.com/projektmelody'
* ouptut: list of S3Files, such as [{ key: 'example1.ts' }, { key: 'example2.ts' }]
*/ */
async done() { async done() {
this.startProgressReports();
try { try {
this.startProgressReports() await this.downloadSegments();
await this.download() await this.uploadSegments();
} catch (e) { } catch (e) {
console.error(`An error was encountered during done() function. This should not happen under nominal scenarios. This may be a bug; please investigate.`)
switch (e) { throw e
case (e instanceof Error && e.name === 'RoomOfflineError'):
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
throw e
case (e instanceof Error && e.name === 'AdminAbortedError'):
// An admin aborted which means we don't want to retry.
// we return and the Task gets marked as successful.
return
case (e instanceof Error && e.name === 'DownloadFailedError'):
throw e
default:
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
}
} finally { } finally {
// download is done, so we upload the segment to S3. this.stopProgressReports();
}
}
try { /**
await this.upload() * downloadSegment
*
* Download a single segment.
* * Creates segment in the database
* * Pushes the segment to this.segments
*/
async downloadSegment() {
const s3_key = `${nanoid()}.ts`
const segment = await createSegment(s3_key, this.vodId)
if (!segment.id) {
throw new Error(`Failed to createSegment(). segment.id was missing.`);
}
console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
this.segments.push(segment)
const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
if (this.downloadStream) throw new Error(`If you see this error, there is a bug in your code. downloadSegment() tried to use this.downloadStream but it was already being used by some other part of the code. Please refactor so this.downloadStream is not used by more than one function at any given time.`);
this.downloadStream = ffmpegStream
ffmpegStream.on('data', (data) => {
let mySegment = this.segments.find((s) => s.id === segment.id)
if (mySegment) {
mySegment.bytes += data.length;
}
})
await pipeline
delete this.downloadStream // cleanup so another iteration can use
}
} catch (e) { /**
* downloadSegments
switch (e) { *
* Fault-tolerant segment downloader.
case (e instanceof Error && e.message === 'no tomes available'): * * Creates segments in the database.
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`) * * Handles common errors
throw e * * Retries until the stream has been offline for >5 minutes.
* * Recursively called
case (e instanceof Error && e.name === 'UploadFailedError'): */
throw e async downloadSegments(): Promise<void> {
try {
await this.downloadSegment()
default: } catch (e) {
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`) if (e instanceof Error && e.name === 'RoomOfflineError') {
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`) // If the room is offline, then we want to retry immediately.
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`) // We do this because the offline room might be a temporary situation.
console.error((e instanceof Error) ? e.message : JSON.stringify(e)) // e.g. streamer's computer bluescreened and they're coming back after they reboot.
// If the room has been offline for >5 minutes, then we consider the stream concluded and we return.
if (this.isTryingDownload()) {
return this.downloadSegments()
} else {
return
} }
} finally {
// @todo - [ ] send S3 complete upload command if necessary
console.info(`finally block. Cleaning up.`) } else if (e instanceof Error && e.name === 'PlaylistFailedError') {
this.stopProgressReports() // sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
return this.downloadSegments()
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`Clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
console.error(`We encountered a DownloadFailedError. I'm unsure why this happens. I guess we will retry.`)
return this.downloadSegments()
} }
} }
} }
/**
* uploadSegments
*
* Fault-tolerant segment uploader.
* * Uploads local segment files to S3
* * Handles common errors
* * Retries each segment up to 9 times
*/
async uploadSegments() {
try {
for (const segment of this.segments) {
await this.uploadSegment(segment)
}
} catch (e) {
console.error('error during uploadSegments(). error as follows.')
console.error(e)
throw e
}
}
async uploadSegment(segment: SegmentResponse) {
const diskPath = join(tmpdir(), segment.s3_key)
const key = segment.s3_key
const client = this.getS3Client()
await pRetry(async (attemptCount: number) => {
console.log(`uploadSegment() attempt ${attemptCount}`)
if (!this.s3Client) throw new Error('S3Client')
const { uploadInstance } = (await RecordNextGeneration._ul(client, diskPath, key))
uploadInstance.on('httpUploadProgress', () => this.onUploadProgress)
return uploadInstance.done()
}, {
onFailedAttempt: (e) => {
console.error(`failed to uploadSegment() with the following error. Retrying.`)
console.error(e)
},
retries: 9
})
}
} }
// async done_old() {
// /**
// * Errors thrown inside the setTimeout callback will end up as an uncaughtException.
// * We handle those errors here to prevent node from exiting.
// */
// process.on('uncaughtException', (e) => {
// this.stopProgressReports()
// console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
// console.log(e)
// process.exit(69)
// })
// try {
// const client = this.getS3Client()
// console.log(`>> 1. Segment downloading phase`)
// while (this.isTryingDownload()) {
// if (!this.databaseUpdateTimer) this.startProgressReports();
// const s3_key = `${nanoid()}.ts`
// const segment = await createSegment(s3_key, this.vodId)
// if (!segment.id) {
// console.log('the following is segment')
// console.log(segment)
// throw new Error(`received invalid segment from db fetch()`);
// }
// this.segments.push(segment)
// console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
// const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
// this.downloadStream = ffmpegStream
// ffmpegStream.on('data', (data) => {
// let mSegment = this.segments.find((s) => s.id === segment.id)
// if (mSegment) {
// mSegment.bytes += data.length;
// }
// })
// await pipeline
// }
// console.log(`>> 2. Segment uploading phase`)
// } catch (e) {
// this.stopProgressReports()
// return this.handleExceptions(e, '_dl()|_ul()')
// }
// }
// }

View File

@ -0,0 +1,28 @@
import { setInterval, setTimeout } from 'node:timers/promises';
async function task() {
console.log('Task is running:', new Date().toISOString());
await setTimeout(1000); // Simulating async work with 1 second delay
console.log('Task completed:', new Date().toISOString());
}
async function main() {
console.log('Main function started');
// Run the async function on an interval
(async () => {
for await (const _ of setInterval(5000)) { // 5 seconds interval
task(); // Running async task without waiting for it to complete
}
})();
// Non-blocking logic in the main function with a for loop
for (let i = 1; i <= 5; i++) {
console.log(`Main function loop iteration ${i}`);
await setTimeout(2000); // Simulate async work (non-blocking) with 2 seconds delay
}
console.log('Main function loop completed');
}
main();

View File

@ -1,97 +1,48 @@
import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts'
import { Helpers, type Task } from 'graphile-worker' import { Helpers, type Task } from 'graphile-worker'
import Record from '../Record.ts'
import type { SegmentResponse } from '@futureporn/types'
import { configs } from '../config.ts'
import { createId } from '@paralleldrive/cuid2'
import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts'
import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts'
import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts'
import getVod from '@futureporn/fetchers/getVod.ts'
import RecordNextGeneration from '../RecordNextGeneration.ts' import RecordNextGeneration from '../RecordNextGeneration.ts'
import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
import getRecording from '@futureporn/fetchers/getRecording.ts'
import patchRecording from '@futureporn/fetchers/patchRecording.ts'
/**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
* recordId is the ID of the record record in postgres
* we use the ID to poll the db to see if the job is aborted by the user
*/
interface Payload { interface Payload {
url: string; recording_id: string;
vod_id: string; // url: string;
// discord_message_id?: string;
// vod_id?: string;
// date?: string;
} }
function assertPayload(payload: any): asserts payload is Payload { function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.url !== "string") throw new Error("invalid url"); if (typeof payload.recording_id !== "string") throw new Error("invalid recording_id");
if (typeof payload.vod_id !== "string") throw new Error(`invalid vod_id=${payload.vod_id}`); }
async function assertVod(payload: Payload) {
let { recording_id } = payload
const recording = await getRecording(recording_id)
if (!recording) throw new Error(`failed to getRecording id=${recording_id}`);
let { url, vod_id, date } = recording
if (vod_id) return { vodId: vod_id };
if (!date) date = new Date().toISOString();
const vtuberId = await findOrCreateVtuber({ url })
const streamId = await findOrCreateStream({ vtuberId, date: new Date(date) })
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, date, recording_id })
if (!vod) throw new Error('failed to createVod. please try again.')
return { vodId: vod.id }
} }
// async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
// helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
// const abortController = new AbortController()
// const abortSignal = abortController.signal
// const accessKeyId = configs.s3AccessKeyId;
// const secretAccessKey = configs.s3SecretAccessKey;
// const region = configs.s3Region;
// const endpoint = configs.s3Endpoint;
// const bucket = configs.s3UscBucket;
// const playlistUrl = await getPlaylistUrl(url)
// if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
// helpers.logger.info(`playlistUrl=${playlistUrl}`)
// const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
// const inputStream = Record.getFFmpegStream({ url: playlistUrl })
// const onProgress = (fileSize: number) => {
// updateSegmentInDatabase({ segment_id, fileSize, helpers })
// .then(checkIfAborted)
// .then((isAborted) => {
// isAborted ? abortController.abort() : null
// })
// .catch((e) => {
// helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance')
// helpers.logger.error(e)
// })
// }
// const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal })
// return record
// }
// function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
// return (!!segment?.vod?.is_recording_aborted)
// }
/**
* # doRecordSegment
*
* Record a segment of a livestream using ffmpeg.
*
* Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can.
*
* This function creates a new segments and vods_segments_links entry in the db via Postgrest REST API.
*
* This function also names the S3 file (s3_key) with a datestamp and a cuid.
*/
// const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
// const s3_key = `${new Date().toISOString()}-${createId()}.ts`
// helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
// const segment_id = await createSegmentInDatabase(s3_key, vod_id)
// helpers.logger.info(`let's create a segmentsStreamLink...`)
// const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id)
// helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
// // no try-catch block here, because we need any errors to bubble up.
// const record = await getRecordInstance(url, segment_id)
// helpers.logger.info(`we got a Record instance. now we record.start()`)
// // console.log(record)
// return record.start()
// }
@ -99,65 +50,23 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
assertPayload(payload) assertPayload(payload)
const { url, vod_id } = payload const { recording_id: recordingId } = payload
const vodId = vod_id const recording = await getRecording(recordingId)
if (!recording) throw new Error(`failed to getRecording() ${recordingId}`);
const playlistUrl = await getPlaylistUrl(url) const { url } = recording
if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`) const { vodId } = await assertVod(payload)
// await patchRecording(recording.id, { vod_id: vodId })
/** /**
* RecordNextGeneration handles errors for us and re-throws ones that should fail the Task. * RecordNextGeneration handles errors for us and re-throws ones that should fail the Task.
* We intentionally do not use a try/catch block here. * We intentionally do not use a try/catch block here.
*/ */
const recordNG = new RecordNextGeneration({ playlistUrl, vodId }) const recordNG = new RecordNextGeneration({ url, vodId })
await recordNG.done() await recordNG.done()
return; return;
// try {
// // if the VOD has been aborted, end Task with success
// if ((await getVod(vod_id, helpers))?.is_recording_aborted) return;
// /**
// * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
// * This will take effect only when Record() throws an error.
// * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry.
// * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops.
// *
// * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
// */
// // await backOff(() => doRecordSegment(url, recordId, helpers))
// await doRecordSegment(url, vodId, helpers)
// } catch (e) {
// // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' })
// helpers.logger.error(`caught an error during record Task`)
// if (e instanceof Error) {
// helpers.logger.info(`error.name=${e.name}`)
// if (e.name === 'RoomOfflineError') {
// // If room is offline, we want to retry until graphile-worker retries expire.
// // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
// // graphile-worker will retry when we re-throw the error below.
// helpers.logger.info(`Room is offline.`)
// } else if (e.name === 'AbortError') {
// // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
// // We swallow the error and return in order to mark the job as succeeded.
// helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
// return
// } else {
// helpers.logger.error(e.message)
// }
// } else {
// helpers.logger.error(JSON.stringify(e))
// }
// // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
// helpers.logger.error(`we got an error during record Task so we throw and retry`)
// throw e
// }
// helpers.logger.info('record Task has finished')
} }

View File

@ -2,4 +2,10 @@
Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail) Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail)
factory has a big disk and lots of RAM in order to do transcoding tasks factory has a big disk and lots of RAM in order to do transcoding tasks
## 240p encodes
ffmpeg -i ./pmel-cb-2023-03-04.mp4 -vf scale=-2:240 -b:v 368k -b:a 45k ./projektmelody-chaturbatep2023-03-04_240p.mp4

View File

@ -0,0 +1,35 @@
-- recordings table schema
CREATE TABLE api.recordings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
url TEXT NOT NULL,
created_at timestamp(6) without time zone,
updated_at timestamp(6) without time zone,
discord_message_id TEXT
);
-- roles & permissions for our backend automation user
GRANT all ON api.recordings TO automation;
GRANT SELECT ON api.recordings TO web_anon;
-- we re-create this function to use recording_id instead of vod_id
DROP FUNCTION public.tg__add_record_job CASCADE;
CREATE FUNCTION public.tg__add_record_job() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
AS $$
begin
PERFORM graphile_worker.add_job('record', json_build_object(
'url', NEW.url,
'recording_id', NEW.id
), max_attempts := 6);
return NEW;
end;
$$;
CREATE TRIGGER create_recording
AFTER UPDATE ON api.recordings
FOR EACH ROW
EXECUTE FUNCTION tg__add_record_job();

View File

@ -0,0 +1,6 @@
ALTER TABLE IF EXISTS api.recordings
ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE IF EXISTS api.recordings
ALTER COLUMN updated_at SET DEFAULT now();

View File

@ -0,0 +1,8 @@
-- create discord_interactions table
CREATE TABLE api.discord_interactions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
discord_message_id TEXT NOT NULL
);
GRANT all ON api.discord_interactions TO automation;
GRANT SELECT ON api.discord_interactions TO web_anon;

View File

@ -0,0 +1,14 @@
-- add more cols to api.recordings
ALTER TABLE api.recordings
ADD COLUMN date TIMESTAMP(6) WITHOUT TIME ZONE;
ALTER TABLE api.recordings
ADD COLUMN vod_id UUID REFERENCES api.vods(id);
ALTER TABLE api.recordings
DROP COLUMN discord_message_id;
ALTER TABLE api.recordings
ADD COLUMN discord_interaction_id UUID REFERENCES api.discord_interactions(id);

View File

@ -0,0 +1,8 @@
DROP TRIGGER create_recording ON api.recordings;
CREATE TRIGGER recording_create
AFTER INSERT ON api.recordings
FOR EACH ROW
EXECUTE PROCEDURE public.tg__add_record_job('record');

View File

@ -0,0 +1,6 @@
-- move is_aborted COL from vods to recordings
ALTER TABLE api.vods
DROP COLUMN is_recording_aborted;
ALTER TABLE api.recordings
ADD COLUMN is_aborted BOOLEAN NOT NULL DEFAULT FALSE;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN segments UUID REFERENCES api.segments(id);

View File

@ -0,0 +1,29 @@
/*
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
{
"code": "PGRST201",
"details": [
{
"cardinality": "one-to-many",
"embedding": "segments with vods",
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
},
{
"cardinality": "many-to-one",
"embedding": "segments with vods",
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
}
],
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
}
*/
ALTER TABLE api.vods
DROP COLUMN segments;

View File

@ -0,0 +1,27 @@
/*
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
{
"code": "PGRST201",
"details": [
{
"cardinality": "one-to-many",
"embedding": "segments with vods",
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
},
{
"cardinality": "many-to-one",
"embedding": "segments with vods",
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
}
],
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
}
*/
ALTER TABLE api.recordings
DROP COLUMN vod_id;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.recordings
ADD COLUMN vod_id UUID REFERENCES api.vods(id);

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN recording_id UUID REFERENCES api.recordings(id);

View File

@ -0,0 +1,4 @@
-- potentially unecessary fk, because api.vods has the relation too.
ALTER TABLE api.recordings
DROP COLUMN vod_id;

View File

@ -32,7 +32,7 @@ export class RoomOfflineError extends Error {
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> { export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`) console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`)
let args = ['-g', roomUrl] let args = ['-4', '-g', roomUrl]
if (proxy) { if (proxy) {
console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`) console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`)
args = args.concat(['--proxy', configs.httpProxy]) args = args.concat(['--proxy', configs.httpProxy])