Compare commits
2 Commits
d89bf4fdea
...
5af878c3a3
Author | SHA1 | Date |
---|---|---|
CJ_Clippy | 5af878c3a3 | |
CJ_Clippy | 92cce429b3 |
26
Tiltfile
26
Tiltfile
|
@ -238,12 +238,7 @@ docker_build(
|
|||
|
||||
|
||||
load('ext://uibutton', 'cmd_button')
|
||||
cmd_button('postgres:create',
|
||||
argv=['./scripts/postgres-create.sh'],
|
||||
resource='postgresql-primary',
|
||||
icon_name='dataset',
|
||||
text='create (empty) databases',
|
||||
)
|
||||
|
||||
cmd_button('postgres:restore',
|
||||
argv=['./scripts/postgres-restore.sh'],
|
||||
resource='postgresql-primary',
|
||||
|
@ -257,18 +252,19 @@ cmd_button('postgres:drop',
|
|||
text='DROP all databases'
|
||||
)
|
||||
cmd_button('postgres:refresh',
|
||||
argv=['sh', './scripts/postgres-refresh.sh'],
|
||||
argv=['echo', '@todo please restart postgrest container manually.'],
|
||||
resource='migrations',
|
||||
icon_name='refresh',
|
||||
text='Refresh schema cache'
|
||||
)
|
||||
|
||||
# cmd_button('capture-api:create',
|
||||
# argv=['http', '--ignore-stdin', 'POST', 'http://localhost:5003/api/record', "url='https://twitch.tv/ironmouse'", "channel='ironmouse'"],
|
||||
# resource='capture-api',
|
||||
# icon_name='send',
|
||||
# text='Start Recording'
|
||||
# )
|
||||
## @todo let's make this get a random room from scout then use the random room to record via POST /recordings
|
||||
cmd_button('capture-worker:create',
|
||||
argv=['./scripts/capture-integration.sh'],
|
||||
resource='capture-worker',
|
||||
icon_name='send',
|
||||
text='Recording Integration Test'
|
||||
)
|
||||
|
||||
cmd_button('postgres:migrate',
|
||||
argv=['./scripts/postgres-migrations.sh'],
|
||||
|
@ -536,6 +532,10 @@ k8s_resource(
|
|||
labels=['backend'],
|
||||
resource_deps=['postgrest', 'postgresql-primary'],
|
||||
)
|
||||
k8s_resource(
|
||||
workload='chihaya',
|
||||
labels=['backend']
|
||||
)
|
||||
k8s_resource(
|
||||
workload='postgrest',
|
||||
port_forwards=['9000'],
|
||||
|
|
|
@ -2,22 +2,6 @@
|
|||
|
||||
|
||||
|
||||
# ---
|
||||
# apiVersion: v1
|
||||
# kind: Service
|
||||
# metadata:
|
||||
# name: capture-api
|
||||
# namespace: futureporn
|
||||
# spec:
|
||||
# type: ClusterIP
|
||||
# selector:
|
||||
# app.kubernetes.io/name: capture
|
||||
# ports:
|
||||
# - name: http
|
||||
# port: {{ .Values.capture.api.port }}
|
||||
# targetPort: http
|
||||
# protocol: TCP
|
||||
|
||||
---
|
||||
apiVersion: apps/v1
|
||||
kind: Deployment
|
||||
|
@ -87,48 +71,3 @@ spec:
|
|||
memory: 1024Mi
|
||||
restartPolicy: Always
|
||||
|
||||
|
||||
# ---
|
||||
# apiVersion: apps/v1
|
||||
# kind: Deployment
|
||||
# metadata:
|
||||
# name: capture-api
|
||||
# namespace: futureporn
|
||||
# labels:
|
||||
# app.kubernetes.io/name: capture
|
||||
# spec:
|
||||
# replicas: {{ .Values.capture.api.replicas }}
|
||||
# selector:
|
||||
# matchLabels:
|
||||
# app: capture-api
|
||||
# template:
|
||||
# metadata:
|
||||
# labels:
|
||||
# app: capture-api
|
||||
# spec:
|
||||
# containers:
|
||||
# - name: capture
|
||||
# image: "{{ .Values.capture.imageName }}"
|
||||
# ports:
|
||||
# - name: http
|
||||
# containerPort: {{ .Values.capture.api.port }}
|
||||
# env:
|
||||
# - name: FUNCTION
|
||||
# value: api
|
||||
# - name: HTTP_PROXY
|
||||
# valueFrom:
|
||||
# secretKeyRef:
|
||||
# name: capture
|
||||
# key: httpProxy
|
||||
# - name: WORKER_CONNECTION_STRING
|
||||
# valueFrom:
|
||||
# secretKeyRef:
|
||||
# name: capture
|
||||
# key: workerConnectionString
|
||||
# - name: PORT
|
||||
# value: "{{ .Values.capture.api.port }}"
|
||||
# resources:
|
||||
# limits:
|
||||
# cpu: 100m
|
||||
# memory: 256Mi
|
||||
# restartPolicy: Always
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
import { configs } from "./config.ts";
|
||||
|
||||
|
||||
export default async function createRecording({ url, discordMessageId, date }: { url: string, discordMessageId?: string, date: Date }) {
|
||||
if (!url) throw new Error('createRecording requires {string} url');
|
||||
const payload = {
|
||||
url,
|
||||
discord_message_id: discordMessageId,
|
||||
date: date.toISOString()
|
||||
}
|
||||
const res = await fetch(`${configs.postgrestUrl}/recordings`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
'Prefer': 'return=representation',
|
||||
'Authorization': `Bearer ${configs.automationUserJwt}`
|
||||
},
|
||||
body: JSON.stringify(payload)
|
||||
})
|
||||
if (!res.ok) {
|
||||
const body = await res.text()
|
||||
const msg = `Failed to create Recording. status=${res.status}, statusText=${res.statusText}, body=${body}`
|
||||
console.error(msg)
|
||||
throw new Error(msg);
|
||||
}
|
||||
const data = await res.json() as { id: string }
|
||||
return data.id
|
||||
}
|
|
@ -0,0 +1,30 @@
|
|||
import { SegmentResponse } from '@futureporn/types';
|
||||
import { configs } from './config.ts'
|
||||
import querystring from 'node:querystring'
|
||||
|
||||
export default async function createSegment(s3_key: string, vod_id: string): Promise<SegmentResponse> {
|
||||
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
|
||||
const segmentPayload = {
|
||||
s3_key,
|
||||
vod_id
|
||||
}
|
||||
const res = await fetch(`${configs.postgrestUrl}/segments`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
'Prefer': 'return=representation',
|
||||
'Authorization': `Bearer ${configs.automationUserJwt}`
|
||||
},
|
||||
body: JSON.stringify(segmentPayload)
|
||||
})
|
||||
if (!res.ok) {
|
||||
const body = await res.text()
|
||||
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
|
||||
console.error(msg)
|
||||
throw new Error(msg);
|
||||
}
|
||||
const data = await res.json() as SegmentResponse[]
|
||||
if (!data[0]) throw new Error('failed to createSegment! body[0] was missing.');
|
||||
return data[0]
|
||||
}
|
|
@ -1,37 +0,0 @@
|
|||
import { configs } from './config.ts'
|
||||
import querystring from 'node:querystring'
|
||||
|
||||
export default async function createSegmentInDatabase(s3_key: string, vod_id: string): Promise<string> {
|
||||
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
|
||||
const segmentPayload = {
|
||||
s3_key,
|
||||
vod_id
|
||||
}
|
||||
// helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
|
||||
// helpers.logger.info(JSON.stringify(segmentPayload))
|
||||
const res = await fetch(`${configs.postgrestUrl}/segments`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
'Prefer': 'return=headers-only',
|
||||
'Authorization': `Bearer ${configs.automationUserJwt}`
|
||||
},
|
||||
body: JSON.stringify(segmentPayload)
|
||||
})
|
||||
if (!res.ok) {
|
||||
const body = await res.text()
|
||||
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
|
||||
console.error(msg)
|
||||
throw new Error(msg);
|
||||
}
|
||||
const location = res.headers.get('location')
|
||||
if (!location) throw new Error(`failed to get location header in response from postgrest`);
|
||||
const parsedQuery = querystring.parse(location)
|
||||
const segmentsId = parsedQuery['/segments?id']
|
||||
if (!segmentsId) throw new Error('segmentsId was undefined which is unexpected');
|
||||
if (Array.isArray(segmentsId)) throw new Error('segmentsId was an array which is unexpected');
|
||||
const id = segmentsId.split('.').at(-1)
|
||||
if (!id) throw new Error('failed to get id ');
|
||||
return id
|
||||
}
|
|
@ -67,7 +67,7 @@ export async function createStream(): Promise<string|null> {
|
|||
*
|
||||
*/
|
||||
export default async function findOrCreateStream({ vtuberId, date, minutes = 15 }: { vtuberId: string, date: Date, minutes?: number }): Promise<string|null> {
|
||||
console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${date.toISOString()}, minutes=${minutes}`)
|
||||
console.info(`findOrCreateStream with vtuberId=${vtuberId}, date=${new Date(date).toISOString()}, minutes=${minutes}`)
|
||||
if (!vtuberId) throw new Error(`findOrCreateStream requires vruberId passed in the options argument.`);
|
||||
if (!date) throw new Error(`findOrCreateStream requires date passed in the options argument.`);
|
||||
const gteDate = sub(date, { minutes })
|
||||
|
|
|
@ -83,9 +83,9 @@ async function createVtuber(vtuber: Partial<VtuberRecord>): Promise<string> {
|
|||
throw new Error(msg)
|
||||
}
|
||||
const json = await res.json() as VtuberResponse[]
|
||||
console.info(`createVtuber with vtuber as follows`)
|
||||
console.info(vtuber)
|
||||
console.info(json)
|
||||
// console.info(`createVtuber with vtuber as follows`)
|
||||
// console.info(vtuber)
|
||||
// console.info(json)
|
||||
const vtuberData = json[0]
|
||||
if (!vtuberData) throw new Error('failed to createVtuber')
|
||||
return vtuberData.id
|
||||
|
@ -97,6 +97,7 @@ export default async function findOrCreateVtuber(query: Partial<vTuberSearchQuer
|
|||
const { url, name } = query
|
||||
if (!url) throw new Error('findOrCreateVtuber was missing url which is required');
|
||||
console.info(`findOrCreateVtuber. url=${url}, name=${name}`)
|
||||
new URL(url) // validate URL, throw if invalid
|
||||
|
||||
const foundVtuber = await findVtuber(query)
|
||||
if (!foundVtuber) {
|
||||
|
|
|
@ -21,7 +21,5 @@ export default async function findVod({ vod_id, discord_message_id }: { vod_id?:
|
|||
throw new Error(msg)
|
||||
}
|
||||
const json = await res.json() as VodResponse[]
|
||||
// console.info(`vod results as follows.`)
|
||||
// console.info(json)
|
||||
return json?.at(0) || null
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
import type { RecordingResponse } from "@futureporn/types";
|
||||
import { configs } from "./config.ts";
|
||||
|
||||
export default async function getRecording(recordingId: string): Promise<RecordingResponse|null> {
|
||||
if (!recordingId) throw new Error(`getRecording requires recordingId param, but it was missing.`);
|
||||
const fetchUrl = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
|
||||
const fetchOptions = {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
'Prefer': 'return=representation'
|
||||
}
|
||||
}
|
||||
const res = await fetch(fetchUrl, fetchOptions)
|
||||
if (!res.ok) {
|
||||
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
|
||||
console.error(msg)
|
||||
throw new Error(msg)
|
||||
}
|
||||
const json = await res.json() as RecordingResponse[]
|
||||
return json?.at(0) || null
|
||||
}
|
||||
|
||||
|
||||
export async function getRecordingRelatedToVod(vodId: string): Promise<RecordingResponse|null> {
|
||||
if (!vodId) throw new Error(`getRecordingRelatedToVod requires vodId param, but it was missing.`);
|
||||
const fetchUrl = `${configs.postgrestUrl}/recordings?select=*,vods!inner(id)&vods.id=eq.${vodId}`
|
||||
const fetchOptions = {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Accept': 'application/json',
|
||||
'Prefer': 'return=representation'
|
||||
}
|
||||
}
|
||||
const res = await fetch(fetchUrl, fetchOptions)
|
||||
if (!res.ok) {
|
||||
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
|
||||
console.error(msg)
|
||||
throw new Error(msg)
|
||||
}
|
||||
const json = await res.json() as RecordingResponse[]
|
||||
return json?.at(0) || null
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
import { configs } from "./config"
|
||||
import type { VodResponse } from "@futureporn/types"
|
||||
|
||||
export default async function getVod(vodId: string) {
|
||||
const url = `${configs.postgrestUrl}/vods?select=*,segments(*)&id=eq.${vodId}`
|
||||
export default async function getVod(vodId: string): Promise<VodResponse|null> {
|
||||
const url = `${configs.postgrestUrl}/vods?select=*,segments(*),recording:recordings(is_aborted)&id=eq.${vodId}`
|
||||
try {
|
||||
const res = await fetch(url)
|
||||
if (!res.ok) {
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import type { RecordingRecord } from "@futureporn/types";
|
||||
import { configs } from "./config.ts";
|
||||
|
||||
export default async function patchRecording(recordingId: string, payload: Partial<RecordingRecord>): Promise<void> {
|
||||
const url = `${configs.postgrestUrl}/recordings?id=eq.${recordingId}`
|
||||
const fetchOptions = {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Authorization': `Bearer ${configs.automationUserJwt}`,
|
||||
'Content-Type': 'application/json',
|
||||
'Prefer': 'return=headers-only'
|
||||
},
|
||||
body: JSON.stringify(payload)
|
||||
}
|
||||
try {
|
||||
const res = await fetch(url, fetchOptions)
|
||||
if (!res.ok) {
|
||||
const body = await res.json()
|
||||
console.error(body)
|
||||
throw new Error(`Problem during patchRecording. res.status=${res.status}, res.statusText=${res.statusText}`)
|
||||
}
|
||||
return
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
@ -21,8 +21,8 @@ export default async function updateSegmentInDatabase({
|
|||
bytes: fileSize
|
||||
}
|
||||
|
||||
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)`
|
||||
console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
|
||||
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))`
|
||||
// console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
|
||||
const res = await fetch(fetchUrl, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
|
|
|
@ -120,6 +120,7 @@ export interface VodRecord {
|
|||
url: string;
|
||||
discord_message_id: string;
|
||||
s3_file: string;
|
||||
recording_id: string;
|
||||
}
|
||||
|
||||
export interface TagRecord {
|
||||
|
@ -134,6 +135,15 @@ export interface User {
|
|||
id: string
|
||||
}
|
||||
|
||||
export interface RecordingResponse {
|
||||
id: string;
|
||||
url: string;
|
||||
date: string;
|
||||
discord_interaction_id: string;
|
||||
is_aborted: boolean;
|
||||
vod_id: string;
|
||||
}
|
||||
|
||||
export interface VodResponse {
|
||||
id: string;
|
||||
stream: StreamResponse;
|
||||
|
@ -157,9 +167,9 @@ export interface VodResponse {
|
|||
note?: string;
|
||||
url: string;
|
||||
segments?: SegmentResponse[];
|
||||
recording: RecordingRecord;
|
||||
status: Status;
|
||||
discord_message_id: string;
|
||||
is_recording_aborted: boolean;
|
||||
}
|
||||
|
||||
|
||||
|
@ -177,7 +187,6 @@ export interface StreamRecord {
|
|||
archive_status: ArchiveStatus;
|
||||
is_chaturbate_stream: Boolean;
|
||||
is_fansly_stream: Boolean;
|
||||
is_recording_aborted: Boolean;
|
||||
status: Status;
|
||||
segments?: SegmentResponse[]
|
||||
}
|
||||
|
@ -203,16 +212,17 @@ export interface StreamResponse {
|
|||
|
||||
export interface RecordingRecord {
|
||||
id: number;
|
||||
recording_state: RecordingState;
|
||||
file_size: number;
|
||||
discord_message_id: string;
|
||||
is_recording_aborted: boolean;
|
||||
is_aborted: boolean;
|
||||
vod_id: string;
|
||||
updated_at: Date;
|
||||
created_at: Date;
|
||||
}
|
||||
|
||||
|
||||
export interface SegmentResponse {
|
||||
id: number;
|
||||
id: string;
|
||||
s3_key: string;
|
||||
s3_id: string;
|
||||
bytes: number;
|
||||
|
|
|
@ -30,7 +30,7 @@ describe('image', function () {
|
|||
describe('getStoryboard', function () {
|
||||
this.timeout(1000*60*15)
|
||||
it('should accept a URL and return a path to image on disk', async function () {
|
||||
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-15.mp4'
|
||||
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-25.mp4'
|
||||
const imagePath = await getStoryboard(url)
|
||||
expect(imagePath).to.match(/\.png/)
|
||||
})
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
#!/bin/bash
|
||||
|
||||
if [ -z "${AUTOMATION_USER_JWT}" ]; then
|
||||
echo "Error: AUTOMATION_USER_JWT variable is not defined."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# get a random room
|
||||
response=$(curl -sL --fail GET http://localhost:8134/chaturbate/random-room)
|
||||
exitcode=$?
|
||||
url=$(echo $response | jq -r '.url')
|
||||
if [[ $exitcode -ne 0 || -z "$response" || -z "$url" ]]; then
|
||||
echo "failed to get random room. exitcode=${exitcode}, response=${response}, url=${url}"
|
||||
exit $exitcode
|
||||
fi
|
||||
echo "Random online chaturbate room url=${url}"
|
||||
|
||||
|
||||
|
||||
# create a recording
|
||||
curl -sL -H "Authorization: Bearer ${AUTOMATION_USER_JWT}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d '{"url": "'"${url}"'"}' \
|
||||
http://localhost:9000/recordings
|
||||
echo "finished creating recording"
|
|
@ -16,7 +16,7 @@ createCommand({
|
|||
if (!message) return bot.logger.error('interaction.message was missing');
|
||||
if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
|
||||
|
||||
const url = `${configs.postgrestUrl}/vods?discord_message_id=eq.${message.id}`;
|
||||
const url = `${configs.postgrestUrl}/recordings?discord_message_id=eq.${message.id}`;
|
||||
const options = {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
|
@ -26,7 +26,7 @@ createCommand({
|
|||
'Authorization': `Bearer ${configs.automationUserJwt}`
|
||||
},
|
||||
body: JSON.stringify({
|
||||
is_recording_aborted: true,
|
||||
is_aborted: true,
|
||||
status: 'aborted' as Status
|
||||
})
|
||||
};
|
||||
|
|
|
@ -13,6 +13,7 @@ import type { StreamResponse } from '@futureporn/types'
|
|||
import createVod from '@futureporn/fetchers/createVod.ts'
|
||||
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
|
||||
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
|
||||
import createRecording from '@futureporn/fetchers/createRecording.ts'
|
||||
|
||||
|
||||
/**
|
||||
|
@ -114,13 +115,10 @@ createCommand({
|
|||
}
|
||||
|
||||
const discord_message_id = message.id.toString()
|
||||
const discordMessageId = discord_message_id
|
||||
const date = new Date()
|
||||
const vtuberId = await findOrCreateVtuber({ url })
|
||||
const streamId = await findOrCreateStream({ vtuberId, date })
|
||||
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
|
||||
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, discord_message_id, date: date.toISOString() })
|
||||
if (!vod) throw new Error('failed to createVod. please try again.')
|
||||
logger.info(`Success! We have created VOD id=${vod.id}`)
|
||||
|
||||
await createRecording({ url, discordMessageId, date })
|
||||
|
||||
} catch (e) {
|
||||
const message = `Record failed due to the following error.\n${e}`
|
||||
|
|
|
@ -1,5 +1,13 @@
|
|||
Task names uses underscores because graphile_worker expects them to be that way because graphile_worker interfaces with Postgresql which uses lowercase and numberscores.
|
||||
|
||||
here are some administrative functions for clearing all tasks. Also see https://worker.graphile.org/docs/admin-functions
|
||||
|
||||
(search tags, for easily finding this file by content)
|
||||
administrative tasks
|
||||
clear all
|
||||
delete all
|
||||
jobs
|
||||
addJob()
|
||||
|
||||
## Add job via SQL
|
||||
|
||||
|
|
|
@ -199,6 +199,34 @@ importers:
|
|||
specifier: ^5.5.4
|
||||
version: 5.5.4
|
||||
|
||||
../..: {}
|
||||
|
||||
../../packages/fetchers: {}
|
||||
|
||||
../../packages/infra: {}
|
||||
|
||||
../../packages/storage: {}
|
||||
|
||||
../../packages/types: {}
|
||||
|
||||
../../packages/utils: {}
|
||||
|
||||
../bot: {}
|
||||
|
||||
../factory: {}
|
||||
|
||||
../mailbox: {}
|
||||
|
||||
../migrations: {}
|
||||
|
||||
../next: {}
|
||||
|
||||
../scout: {}
|
||||
|
||||
../strapi: {}
|
||||
|
||||
../uppy: {}
|
||||
|
||||
packages:
|
||||
|
||||
'@aws-crypto/crc32@5.2.0':
|
||||
|
|
|
@ -1,14 +1,10 @@
|
|||
/**
|
||||
* RecordNextGeneration.ts
|
||||
*
|
||||
* @important @todo There is a MEMORY LEAK in here somewhere!
|
||||
* This causes OOMKiller to cull the capture process. Not good!
|
||||
*
|
||||
*/
|
||||
|
||||
import { VodResponse } from "@futureporn/types"
|
||||
import getVod from '@futureporn/fetchers/getVod.ts'
|
||||
import { Duplex, PassThrough, Readable, type Writable } from "stream"
|
||||
import { PassThrough, Readable, type Writable } from "stream"
|
||||
import { tmpdir } from 'node:os'
|
||||
import { join } from 'node:path'
|
||||
import { ua0 } from '@futureporn/utils/name.ts'
|
||||
|
@ -20,13 +16,18 @@ import { Upload, type Progress } from "@aws-sdk/lib-storage"
|
|||
import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
|
||||
import prettyBytes from "pretty-bytes"
|
||||
import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts"
|
||||
import createSegmentInDatabase from "@futureporn/fetchers/createSegmentInDatabase.ts"
|
||||
import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts"
|
||||
import createSegment from "@futureporn/fetchers/createSegment.ts"
|
||||
import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts"
|
||||
import { createReadStream, createWriteStream } from "fs"
|
||||
import pRetry, {AbortError} from 'p-retry'
|
||||
import { type SegmentResponse } from '@futureporn/types'
|
||||
import getPlaylistUrl from "@futureporn/fetchers/getPlaylistUrl.ts"
|
||||
import { isBefore, sub } from 'date-fns'
|
||||
|
||||
export interface RecordNextGenerationArguments {
|
||||
vodId: string;
|
||||
playlistUrl: string;
|
||||
url: string;
|
||||
}
|
||||
|
||||
export class AdminAbortedError extends Error {
|
||||
|
@ -54,6 +55,20 @@ export class UploadFailedError extends Error {
|
|||
}
|
||||
}
|
||||
|
||||
export class PlaylistFailedError extends Error {
|
||||
constructor(message?: string) {
|
||||
super(message)
|
||||
Object.setPrototypeOf(this, PlaylistFailedError.prototype)
|
||||
this.name = this.constructor.name
|
||||
this.message = `PlaylistFailedError. ${this.message}`
|
||||
}
|
||||
getErrorMessage() {
|
||||
return this.message
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
export class DownloadFailedError extends Error {
|
||||
constructor(message?: string) {
|
||||
super(message)
|
||||
|
@ -68,22 +83,33 @@ export class DownloadFailedError extends Error {
|
|||
|
||||
|
||||
/**
|
||||
* RecordNextGeneration
|
||||
* # RecordNextGeneration
|
||||
*
|
||||
* The function which records VODs in a Futureporn specific way.
|
||||
* The function which records VODs in a Futureporn specific, fault-tolerant way.
|
||||
*
|
||||
* Issues
|
||||
* ## Issues/TODO list
|
||||
*
|
||||
* @todo [x] onProgress stops firing
|
||||
* @todo [ ] OOMKilled seen via development environment
|
||||
* @todo [x] OOMKilled seen via development environment
|
||||
* @todo [ ] undefined behavior during CB private shows
|
||||
* @todo [ ] does not handle CB Hidden Camera ticket shows
|
||||
* @todo [ ] Upload segments in a way that does not interrupt downloading new segments.
|
||||
* There is an issue where a segment download ends, and the segment upload immediately begins.
|
||||
* At first glance this looks like good behavior, but what is happening during the segment upload is that the livestream
|
||||
* is continuing, but we aren't recording it anymore. We are using Backblaze, thus uploads are slow.
|
||||
* We miss a lot of the stream because the upload takes many minutes.
|
||||
* Instead of this behavior of immediately uploading after a segment download completes, we should upload once the livestream is finished,
|
||||
* OR we should upload while concurrently downloading the next segment.
|
||||
* @todo [ ] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context.
|
||||
* There is an issue where the `record` task needs to retry after a temporary failure, but it cannot because there aren't any available workers.
|
||||
* The solution is to not exit the `record` task at all, and instead keep the `record` task running, but suspended while a exponential backoff timer elapses.
|
||||
* This way, the worker stays focused on the recording and retries until the stream has been offline for n minutes, at which point `record` is complete.
|
||||
*
|
||||
*/
|
||||
export default class RecordNextGeneration {
|
||||
|
||||
public vodId: string;
|
||||
public segmentId?: string;
|
||||
public segmentVodLinkId?: string;
|
||||
public playlistUrl: string;
|
||||
public url: string;
|
||||
public s3Key?: string;
|
||||
public s3Bucket?: string;
|
||||
public s3Client?: S3Client;
|
||||
|
@ -92,25 +118,52 @@ export default class RecordNextGeneration {
|
|||
private downloadStream?: Readable;
|
||||
private uploadStream?: PassThrough;
|
||||
private uploadInstance?: Upload;
|
||||
private streamPipeline?: Promise<void>;
|
||||
private diskStream?: Writable;
|
||||
private uploadCounter: number;
|
||||
private downloadCounter: number;
|
||||
private databaseUpdateTimer?: NodeJS.Timeout;
|
||||
private updateTimeout: number;
|
||||
private abortController: AbortController;
|
||||
private segments: SegmentResponse[];
|
||||
private retries: number;
|
||||
|
||||
|
||||
|
||||
constructor({ vodId, playlistUrl }: RecordNextGenerationArguments) {
|
||||
constructor({ vodId, url }: RecordNextGenerationArguments) {
|
||||
this.vodId = vodId
|
||||
this.playlistUrl = playlistUrl
|
||||
this.url = url
|
||||
this.uploadCounter = 0
|
||||
this.downloadCounter = 0
|
||||
|
||||
|
||||
// const outputStream = createWriteStream('/dev/null')
|
||||
// setInterval(() => { inputStream.push('simulated downloader bytes received') }, 50)
|
||||
// setTimeout(() => { inputStream.destroy() })
|
||||
this.updateTimeout = 30*1000
|
||||
this.abortController = new AbortController()
|
||||
this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this))
|
||||
this.retries = 0
|
||||
this.segments = []
|
||||
}
|
||||
|
||||
async withRetry(fn: any, retries = 3) {
|
||||
return pRetry(fn, {
|
||||
onFailedAttempt: (e) => {
|
||||
console.error(`Error during attempt:`, e);
|
||||
},
|
||||
retries
|
||||
});
|
||||
}
|
||||
|
||||
abortEventListener() {
|
||||
console.log(`abortEventListener has been invoked. this.abortSignal is as follows`)
|
||||
console.log(this.abortController.signal)
|
||||
console.log(JSON.stringify(this.abortController.signal, null, 2))
|
||||
const reason = this.abortController.signal.reason
|
||||
if (this.downloadStream) {
|
||||
console.log(`aborted the stream download with reason=${reason}`)
|
||||
this.downloadStream.destroy(new AdminAbortedError())
|
||||
} else {
|
||||
console.warn(`downloadStream does not exist. Perhaps it has already been aborted?`)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
getMultipartUpload({
|
||||
client,
|
||||
bucket,
|
||||
|
@ -138,14 +191,13 @@ export default class RecordNextGeneration {
|
|||
params
|
||||
})
|
||||
|
||||
|
||||
/**
|
||||
* aws client docs recommend against using async onProgress handlers.
|
||||
* therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere.
|
||||
*/
|
||||
const onProgress = (progress: Progress) => {
|
||||
if (progress?.loaded) {
|
||||
console.log(`Progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
|
||||
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
|
||||
this.reportMemoryUsage()
|
||||
this.uploadCounter = progress.loaded
|
||||
}
|
||||
|
@ -155,64 +207,35 @@ export default class RecordNextGeneration {
|
|||
return upload
|
||||
}
|
||||
|
||||
// static deleteme () {
|
||||
|
||||
// // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
|
||||
// // when the download stream closes before the upload stream, I think the upload stream gets cut off.
|
||||
// // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
|
||||
// // is that right? IDK what's happening, but we don't get any segment data on S3 at all??
|
||||
// // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
|
||||
// // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
|
||||
// // So I think what is happening is the upload is happening, but it's not finishing.
|
||||
// // It looks like the finish is only allowed to happen under completely normal circumstances.
|
||||
// // However, the segment upload may fail in production, and we need to let the upload finish even then.
|
||||
// //
|
||||
// // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
||||
// // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
|
||||
// // But because of the catch, that promise will never resolve?
|
||||
// // What happens to an in-progress Promise when an error is thrown?
|
||||
|
||||
// maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694
|
||||
|
||||
// // await this.upload.done();
|
||||
// // console.log('Upload is complete.')
|
||||
// // return this.uploadStream
|
||||
// @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
|
||||
// when the download stream closes before the upload stream, I think the upload stream gets cut off.
|
||||
// this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
|
||||
// is that right? IDK what's happening, but we don't get any segment data on S3 at all??
|
||||
// Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
|
||||
// it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
|
||||
// So I think what is happening is the upload is happening, but it's not finishing.
|
||||
// It looks like the finish is only allowed to happen under completely normal circumstances.
|
||||
// However, the segment upload may fail in production, and we need to let the upload finish even then.
|
||||
//
|
||||
// I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
|
||||
// Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
|
||||
// But because of the catch, that promise will never resolve?
|
||||
// What happens to an in-progress Promise when an error is thrown?
|
||||
//
|
||||
// maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694
|
||||
|
||||
|
||||
// } catch (e) {
|
||||
// // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error.
|
||||
// // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted.
|
||||
// // in other words, `(e.name === 'AbortError')` will never be true.
|
||||
// if (this.abortSignal.aborted) {
|
||||
// console.error('While uploading, the upload was aborted.')
|
||||
// setTimeout(async () => {
|
||||
// await this.upload?.abort()
|
||||
// }, 1000)
|
||||
// // if (this.upload) {
|
||||
// // const command = new CompleteMultipartUploadCommand()
|
||||
// // }
|
||||
// return;
|
||||
// }
|
||||
static getDiskStream(s3Key?: string) {
|
||||
const tmpDiskPath = join(tmpdir(), s3Key || `${nanoid()}.ts`)
|
||||
return createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
|
||||
}
|
||||
|
||||
// if (e instanceof Error) {
|
||||
// console.error(`We were uploading a file to S3 but then we encountered an exception!`)
|
||||
// console.error(e)
|
||||
// throw e
|
||||
// } else {
|
||||
// throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
||||
static getFFmpegStream({ url }: { url: string }): Readable {
|
||||
console.log(`getFFmpegStream using url=${url}`)
|
||||
static getFFmpegStream({ playlistUrl }: { playlistUrl: string }): Readable {
|
||||
console.log(`getFFmpegStream using playlistUrl=${playlistUrl}`)
|
||||
const ffmpegProc = spawn('ffmpeg', [
|
||||
'-headers', `"User-Agent: ${ua0}"`,
|
||||
'-i', url,
|
||||
'-i', playlistUrl,
|
||||
'-c:v', 'copy',
|
||||
'-c:a', 'copy',
|
||||
'-movflags', 'faststart',
|
||||
|
@ -227,6 +250,14 @@ export default class RecordNextGeneration {
|
|||
return ffmpegProc.stdout
|
||||
}
|
||||
|
||||
onUploadProgress(progress: Progress) {
|
||||
if (progress?.loaded) {
|
||||
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
|
||||
this.reportMemoryUsage()
|
||||
this.uploadCounter = progress.loaded
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
formatMemoryStats(stats: NodeJS.MemoryUsage): Record<string, string> {
|
||||
const formattedStats: Record<string, string> = {};
|
||||
|
@ -279,11 +310,10 @@ export default class RecordNextGeneration {
|
|||
}
|
||||
|
||||
getNames() {
|
||||
const tmpFileName = `${nanoid()}.ts`
|
||||
this.s3Key = tmpFileName
|
||||
this.tmpDiskPath = join(tmpdir(), tmpFileName)
|
||||
console.log(`tmpDiskPath=${this.tmpDiskPath}`)
|
||||
return { tmpDiskPath: this.tmpDiskPath, s3Key: this.s3Key }
|
||||
const s3Key = `${nanoid()}.ts`
|
||||
const tmpDiskPath = join(tmpdir(), s3Key)
|
||||
console.log(`tmpDiskPath=${tmpDiskPath}`)
|
||||
return { tmpDiskPath, s3Key }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -294,44 +324,60 @@ export default class RecordNextGeneration {
|
|||
* * segment
|
||||
* * segment_vod_link
|
||||
*/
|
||||
async getDatabaseRecords() {
|
||||
this.vod = await getVod(this.vodId)
|
||||
if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`)
|
||||
if (this.vod.is_recording_aborted) throw new AdminAbortedError();
|
||||
// async getDatabaseRecords() {
|
||||
// this.vod = await getVod(this.vodId)
|
||||
// if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
|
||||
// if (this.vod.recording.is_aborted) throw new AdminAbortedError();
|
||||
|
||||
if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
|
||||
this.segmentId = await createSegmentInDatabase(this.s3Key, this.vodId)
|
||||
this.segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
|
||||
|
||||
if (!this.vod) throw new Error('after getRecords() ran, this.vod was missing.');
|
||||
if (!this.segmentId) throw new Error('after getRecords() ran, this.segmentId was missing.');
|
||||
if (!this.segmentVodLinkId) throw new Error('after getRecords() ran, this.segmentVodLinkId was missing.');
|
||||
// if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
|
||||
// const segmentId = await createSegment(this.s3Key, this.vodId)
|
||||
// const segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
|
||||
|
||||
// if (!this.vod) throw new Error('after getDatabaseRecords() ran, this.vod was missing.');
|
||||
// if (!segmentId) throw new Error('after getDatabaseRecords() ran, segmentId was missing.');
|
||||
// if (!segmentVodLinkId) throw new Error('after getDatabaseRecords() ran, segmentVodLinkId was missing.');
|
||||
|
||||
// return { segmentId, segmentVodLinkId }
|
||||
// }
|
||||
|
||||
|
||||
static async _dl(url: string, s3Key: string) {
|
||||
const playlistUrl = await getPlaylistUrl(url)
|
||||
if (!playlistUrl) throw new PlaylistFailedError();
|
||||
const ffmpegStream = RecordNextGeneration.getFFmpegStream({ playlistUrl })
|
||||
const diskStream = RecordNextGeneration.getDiskStream(s3Key)
|
||||
const streamPipeline = pipeline(ffmpegStream, diskStream)
|
||||
return {
|
||||
pipeline: streamPipeline,
|
||||
ffmpegStream,
|
||||
diskStream,
|
||||
}
|
||||
}
|
||||
|
||||
async download() {
|
||||
const { tmpDiskPath } = this.getNames()
|
||||
const s3Client = this.getS3Client()
|
||||
await this.getDatabaseRecords()
|
||||
static async _ul(client: S3Client, diskPath: string, key: string) {
|
||||
const diskStream = createReadStream(diskPath, { encoding: 'utf-8' })
|
||||
|
||||
this.downloadStream = RecordNextGeneration.getFFmpegStream({ url: this.playlistUrl })
|
||||
this.diskStream = createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
|
||||
const params = {
|
||||
Bucket: configs.s3UscBucket,
|
||||
Key: key,
|
||||
Body: diskStream
|
||||
}
|
||||
const uploadInstance = new Upload({
|
||||
client,
|
||||
partSize: 1024 * 1024 * 5,
|
||||
queueSize: 1,
|
||||
// @see https://github.com/aws/aws-sdk-js-v3/issues/2311
|
||||
// tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
|
||||
// It should instead be interpreted as, 'throwOnPartsError'
|
||||
leavePartsOnError: true,
|
||||
params
|
||||
})
|
||||
|
||||
this.streamPipeline = pipeline(this.downloadStream, this.diskStream)
|
||||
|
||||
this.downloadStream.on('data', (data: any) => this.downloadCounter += data.length)
|
||||
this.downloadStream.on('close', (arg: any) => console.log(`RecordNextGeneration downloadStream close. arg=${arg}`))
|
||||
this.downloadStream.on('end', (arg: any) => console.log(`RecordNextGeneration downloadStream end. arg=${arg}`))
|
||||
this.downloadStream.on('drain', (arg: any) => console.log(`RecordNextGeneration downloadStream drain. arg=${arg}`))
|
||||
// this.downloadStream.on('pause', (arg: any) => console.log(`RecordNextGeneration downloadStream pause. arg=${arg}`))
|
||||
this.downloadStream.on('error', (arg: any) => console.log(`RecordNextGeneration downloadStream error. arg=${arg}`))
|
||||
|
||||
this.diskStream.on('close', (arg: any) => console.log(`RecordNextGeneration diskStream close. arg=${arg}`))
|
||||
this.diskStream.on('end', (arg: any) => console.log(`RecordNextGeneration diskStream end. arg=${arg}`))
|
||||
// this.diskStream.on('drain', (arg: any) => console.log(`RecordNextGeneration diskStream drain. arg=${arg}`))
|
||||
this.diskStream.on('pause', (arg: any) => console.log(`RecordNextGeneration diskStream pause. arg=${arg}`))
|
||||
this.diskStream.on('error', (arg: any) => console.log(`RecordNextGeneration diskStream error. arg=${arg}`))
|
||||
|
||||
await this.streamPipeline
|
||||
return {
|
||||
uploadInstance,
|
||||
diskStream,
|
||||
}
|
||||
}
|
||||
|
||||
async upload() {
|
||||
|
@ -343,112 +389,295 @@ export default class RecordNextGeneration {
|
|||
if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()');
|
||||
const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' })
|
||||
|
||||
// this.uploadStream = new PassThrough()
|
||||
this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream })
|
||||
|
||||
// this.uploadInstance.on('close', (arg: any) => console.log(`RecordNextGeneration uploadStream close. arg=${arg}`))
|
||||
// this.uploadInstance.on('end', (arg: any) => console.log(`RecordNextGeneration uploadStream end. arg=${arg}`))
|
||||
// this.uploadInstance.on('drain', (arg: any) => console.log(`RecordNextGeneration uploadStream drain. arg=${arg}`))
|
||||
// this.uploadInstance.on('pause', (arg: any) => console.log(`RecordNextGeneration uploadStream pause. arg=${arg}`))
|
||||
// this.uploadInstance.on('error', (arg: any) => console.log(`RecordNextGeneration uploadStream error. arg=${arg}`))
|
||||
|
||||
await this.uploadInstance.done()
|
||||
}
|
||||
|
||||
startProgressReports() {
|
||||
/**
|
||||
* occasionalDatabaseRecordUpdater
|
||||
*
|
||||
* We need to update the segment database record so the admin UI shows the progress of the recording.
|
||||
* Here we use the byte count that was set by AWS S3 uploader to update the segment record.
|
||||
* Then, we queue another update 1 minute from the completion of this function.
|
||||
*/
|
||||
const occasionalDatabaseRecordUpdater = async () => {
|
||||
console.log(`occasionalDatabaseRecordUpdater() is running now. downloadCounter=${this.downloadCounter} (${prettyBytes(this.downloadCounter)}), uploadCounter=${this.uploadCounter} (${prettyBytes(this.uploadCounter)})`)
|
||||
this.reportMemoryUsage()
|
||||
if (this.segmentId) {
|
||||
await updateSegmentInDatabase({ segment_id: this.segmentId, fileSize: this.downloadCounter })
|
||||
}
|
||||
return setTimeout(occasionalDatabaseRecordUpdater, 60*1000)
|
||||
|
||||
/**
|
||||
* # handleExceptions
|
||||
*
|
||||
* We want to handle any exceptions that are thrown, so our process continues running.
|
||||
* Ideally we know every failure scenario and we handle it graceefully.
|
||||
* If we ever reach the default: case below, it's a bug and we need to patch it.
|
||||
*/
|
||||
handleExceptions (e: any, phase?: string) {
|
||||
console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`)
|
||||
|
||||
if (e instanceof Error && e.name === 'RoomOfflineError') {
|
||||
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
|
||||
// we do this because the offline might be a temporary situation.
|
||||
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
|
||||
// @todo try again immediately
|
||||
|
||||
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
|
||||
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
|
||||
// @todo try again immediately
|
||||
|
||||
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
|
||||
// An admin aborted the recording which means we don't want to retry recording.
|
||||
// we return <void> which causes the 'record' Task to be marked as successful.
|
||||
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
|
||||
return
|
||||
|
||||
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
|
||||
throw e
|
||||
|
||||
} else if (e instanceof Error && e.message === 'no tomes available') {
|
||||
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
|
||||
throw e
|
||||
|
||||
} else if (e instanceof Error && e.name === 'UploadFailedError') {
|
||||
throw e
|
||||
|
||||
} else {
|
||||
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
|
||||
}
|
||||
this.databaseUpdateTimer = setTimeout(occasionalDatabaseRecordUpdater, 60*1000)
|
||||
}
|
||||
|
||||
async updateDatabaseRecords() {
|
||||
await this.updateSegmentBytes()
|
||||
await this.checkForAborted()
|
||||
}
|
||||
|
||||
async checkForAborted() {
|
||||
const recording = await getRecordingRelatedToVod(this.vodId)
|
||||
if (!recording) throw new Error(`failed to get recording related to vodId=${this.vodId}`);
|
||||
if (recording.is_aborted) {
|
||||
this.abortController.abort()
|
||||
}
|
||||
}
|
||||
|
||||
async updateSegmentBytes() {
|
||||
for (const [index, segment] of this.segments.entries()) {
|
||||
if (segment.id) {
|
||||
console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} segments.bytes=${segment.bytes}`)
|
||||
await updateSegmentInDatabase({ segment_id: segment.id, fileSize: segment.bytes })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startProgressReports() {
|
||||
this.databaseUpdateTimer = setInterval(async () => {
|
||||
try {
|
||||
await this.updateDatabaseRecords()
|
||||
} catch (e) {
|
||||
console.error(`during startProgressReports(), we encountered the following error.`)
|
||||
console.error(e)
|
||||
}
|
||||
}, this.updateTimeout)
|
||||
}
|
||||
|
||||
stopProgressReports() {
|
||||
clearInterval(this.databaseUpdateTimer)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* done() waits for the recording to be complete.
|
||||
* isTryingDownload
|
||||
*
|
||||
* There are always more tries unless the stream has been offline for greater than 5 minutes.
|
||||
*/
|
||||
isTryingDownload() {
|
||||
const isSegmentPresent = (this.segments && this.segments?.length > 0)
|
||||
if (!isSegmentPresent) return true;
|
||||
const latestSegment = this.segments.at(-1)
|
||||
const hasUpdatedTimestamp = latestSegment?.updated_at
|
||||
if (!hasUpdatedTimestamp) throw new Error('latestSegment does not have an updated_at property');
|
||||
const fiveMinsAgo = sub(new Date(), { minutes: 5 })
|
||||
const lastUpdatedAt = latestSegment.updated_at
|
||||
return (isBefore(lastUpdatedAt, fiveMinsAgo)) ? true : false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* done()
|
||||
*
|
||||
* Repeatedly download segments until there is no more stream.
|
||||
* Stream is considered no-more once it has been offline for >5 minutes.
|
||||
* When downloading is done, upload segments to S3.
|
||||
*
|
||||
* input: stream URL, such as 'https://chaturbate.com/projektmelody'
|
||||
* ouptut: list of S3Files, such as [{ key: 'example1.ts' }, { key: 'example2.ts' }]
|
||||
*/
|
||||
async done() {
|
||||
|
||||
|
||||
|
||||
this.startProgressReports();
|
||||
try {
|
||||
this.startProgressReports()
|
||||
await this.download()
|
||||
|
||||
|
||||
await this.downloadSegments();
|
||||
await this.uploadSegments();
|
||||
} catch (e) {
|
||||
|
||||
switch (e) {
|
||||
|
||||
|
||||
case (e instanceof Error && e.name === 'RoomOfflineError'):
|
||||
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
|
||||
// we do this because the offline might be a temporary situation.
|
||||
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
|
||||
throw e
|
||||
|
||||
case (e instanceof Error && e.name === 'AdminAbortedError'):
|
||||
// An admin aborted which means we don't want to retry.
|
||||
// we return and the Task gets marked as successful.
|
||||
return
|
||||
|
||||
case (e instanceof Error && e.name === 'DownloadFailedError'):
|
||||
throw e
|
||||
|
||||
default:
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (download section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
|
||||
}
|
||||
|
||||
|
||||
|
||||
console.error(`An error was encountered during done() function. This should not happen under nominal scenarios. This may be a bug; please investigate.`)
|
||||
throw e
|
||||
} finally {
|
||||
// download is done, so we upload the segment to S3.
|
||||
this.stopProgressReports();
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await this.upload()
|
||||
/**
|
||||
* downloadSegment
|
||||
*
|
||||
* Download a single segment.
|
||||
* * Creates segment in the database
|
||||
* * Pushes the segment to this.segments
|
||||
*/
|
||||
async downloadSegment() {
|
||||
const s3_key = `${nanoid()}.ts`
|
||||
const segment = await createSegment(s3_key, this.vodId)
|
||||
if (!segment.id) {
|
||||
throw new Error(`Failed to createSegment(). segment.id was missing.`);
|
||||
}
|
||||
console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
|
||||
this.segments.push(segment)
|
||||
const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
|
||||
if (this.downloadStream) throw new Error(`If you see this error, there is a bug in your code. downloadSegment() tried to use this.downloadStream but it was already being used by some other part of the code. Please refactor so this.downloadStream is not used by more than one function at any given time.`);
|
||||
this.downloadStream = ffmpegStream
|
||||
ffmpegStream.on('data', (data) => {
|
||||
let mySegment = this.segments.find((s) => s.id === segment.id)
|
||||
if (mySegment) {
|
||||
mySegment.bytes += data.length;
|
||||
}
|
||||
})
|
||||
await pipeline
|
||||
delete this.downloadStream // cleanup so another iteration can use
|
||||
}
|
||||
|
||||
} catch (e) {
|
||||
|
||||
switch (e) {
|
||||
|
||||
case (e instanceof Error && e.message === 'no tomes available'):
|
||||
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
|
||||
throw e
|
||||
|
||||
case (e instanceof Error && e.name === 'UploadFailedError'):
|
||||
throw e
|
||||
|
||||
|
||||
default:
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error(`!!!!!!!!!!!!!! switch/case (upload section) defaulted which should probably never happen. Please patch the code to handle this scenario.`)
|
||||
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
|
||||
/**
|
||||
* downloadSegments
|
||||
*
|
||||
* Fault-tolerant segment downloader.
|
||||
* * Creates segments in the database.
|
||||
* * Handles common errors
|
||||
* * Retries until the stream has been offline for >5 minutes.
|
||||
* * Recursively called
|
||||
*/
|
||||
async downloadSegments(): Promise<void> {
|
||||
try {
|
||||
await this.downloadSegment()
|
||||
} catch (e) {
|
||||
if (e instanceof Error && e.name === 'RoomOfflineError') {
|
||||
// If the room is offline, then we want to retry immediately.
|
||||
// We do this because the offline room might be a temporary situation.
|
||||
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
|
||||
// If the room has been offline for >5 minutes, then we consider the stream concluded and we return.
|
||||
if (this.isTryingDownload()) {
|
||||
return this.downloadSegments()
|
||||
} else {
|
||||
return
|
||||
}
|
||||
} finally {
|
||||
// @todo - [ ] send S3 complete upload command if necessary
|
||||
|
||||
console.info(`finally block. Cleaning up.`)
|
||||
this.stopProgressReports()
|
||||
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
|
||||
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
|
||||
return this.downloadSegments()
|
||||
|
||||
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
|
||||
// An admin aborted the recording which means we don't want to retry recording.
|
||||
// we return <void> which causes the 'record' Task to be marked as successful.
|
||||
console.log(`Clear as day, that is an AdminAbortedError! ❤️`)
|
||||
return
|
||||
|
||||
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
|
||||
console.error(`We encountered a DownloadFailedError. I'm unsure why this happens. I guess we will retry.`)
|
||||
return this.downloadSegments()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* uploadSegments
|
||||
*
|
||||
* Fault-tolerant segment uploader.
|
||||
* * Uploads local segment files to S3
|
||||
* * Handles common errors
|
||||
* * Retries each segment up to 9 times
|
||||
*/
|
||||
async uploadSegments() {
|
||||
try {
|
||||
for (const segment of this.segments) {
|
||||
await this.uploadSegment(segment)
|
||||
}
|
||||
} catch (e) {
|
||||
console.error('error during uploadSegments(). error as follows.')
|
||||
console.error(e)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
async uploadSegment(segment: SegmentResponse) {
|
||||
const diskPath = join(tmpdir(), segment.s3_key)
|
||||
const key = segment.s3_key
|
||||
const client = this.getS3Client()
|
||||
await pRetry(async (attemptCount: number) => {
|
||||
console.log(`uploadSegment() attempt ${attemptCount}`)
|
||||
if (!this.s3Client) throw new Error('S3Client')
|
||||
const { uploadInstance } = (await RecordNextGeneration._ul(client, diskPath, key))
|
||||
uploadInstance.on('httpUploadProgress', () => this.onUploadProgress)
|
||||
return uploadInstance.done()
|
||||
}, {
|
||||
onFailedAttempt: (e) => {
|
||||
console.error(`failed to uploadSegment() with the following error. Retrying.`)
|
||||
console.error(e)
|
||||
},
|
||||
retries: 9
|
||||
})
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// async done_old() {
|
||||
|
||||
// /**
|
||||
// * Errors thrown inside the setTimeout callback will end up as an uncaughtException.
|
||||
// * We handle those errors here to prevent node from exiting.
|
||||
// */
|
||||
// process.on('uncaughtException', (e) => {
|
||||
// this.stopProgressReports()
|
||||
// console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
|
||||
// console.log(e)
|
||||
// process.exit(69)
|
||||
// })
|
||||
|
||||
// try {
|
||||
// const client = this.getS3Client()
|
||||
|
||||
|
||||
// console.log(`>> 1. Segment downloading phase`)
|
||||
// while (this.isTryingDownload()) {
|
||||
// if (!this.databaseUpdateTimer) this.startProgressReports();
|
||||
|
||||
// const s3_key = `${nanoid()}.ts`
|
||||
// const segment = await createSegment(s3_key, this.vodId)
|
||||
// if (!segment.id) {
|
||||
// console.log('the following is segment')
|
||||
// console.log(segment)
|
||||
// throw new Error(`received invalid segment from db fetch()`);
|
||||
// }
|
||||
// this.segments.push(segment)
|
||||
// console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
|
||||
|
||||
// const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
|
||||
// this.downloadStream = ffmpegStream
|
||||
// ffmpegStream.on('data', (data) => {
|
||||
// let mSegment = this.segments.find((s) => s.id === segment.id)
|
||||
// if (mSegment) {
|
||||
// mSegment.bytes += data.length;
|
||||
// }
|
||||
// })
|
||||
// await pipeline
|
||||
// }
|
||||
|
||||
// console.log(`>> 2. Segment uploading phase`)
|
||||
|
||||
|
||||
|
||||
// } catch (e) {
|
||||
// this.stopProgressReports()
|
||||
// return this.handleExceptions(e, '_dl()|_ul()')
|
||||
|
||||
|
||||
// }
|
||||
// }
|
||||
|
||||
// }
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
import { setInterval, setTimeout } from 'node:timers/promises';
|
||||
|
||||
async function task() {
|
||||
console.log('Task is running:', new Date().toISOString());
|
||||
await setTimeout(1000); // Simulating async work with 1 second delay
|
||||
console.log('Task completed:', new Date().toISOString());
|
||||
}
|
||||
|
||||
async function main() {
|
||||
console.log('Main function started');
|
||||
|
||||
// Run the async function on an interval
|
||||
(async () => {
|
||||
for await (const _ of setInterval(5000)) { // 5 seconds interval
|
||||
task(); // Running async task without waiting for it to complete
|
||||
}
|
||||
})();
|
||||
|
||||
// Non-blocking logic in the main function with a for loop
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
console.log(`Main function loop iteration ${i}`);
|
||||
await setTimeout(2000); // Simulate async work (non-blocking) with 2 seconds delay
|
||||
}
|
||||
|
||||
console.log('Main function loop completed');
|
||||
}
|
||||
|
||||
main();
|
|
@ -1,97 +1,48 @@
|
|||
|
||||
|
||||
import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts'
|
||||
import { Helpers, type Task } from 'graphile-worker'
|
||||
import Record from '../Record.ts'
|
||||
import type { SegmentResponse } from '@futureporn/types'
|
||||
import { configs } from '../config.ts'
|
||||
import { createId } from '@paralleldrive/cuid2'
|
||||
import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts'
|
||||
import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts'
|
||||
import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts'
|
||||
import getVod from '@futureporn/fetchers/getVod.ts'
|
||||
import RecordNextGeneration from '../RecordNextGeneration.ts'
|
||||
import createVod from '@futureporn/fetchers/createVod.ts'
|
||||
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
|
||||
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
|
||||
import getRecording from '@futureporn/fetchers/getRecording.ts'
|
||||
import patchRecording from '@futureporn/fetchers/patchRecording.ts'
|
||||
|
||||
|
||||
/**
|
||||
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
|
||||
* recordId is the ID of the record record in postgres
|
||||
* we use the ID to poll the db to see if the job is aborted by the user
|
||||
*/
|
||||
interface Payload {
|
||||
url: string;
|
||||
vod_id: string;
|
||||
recording_id: string;
|
||||
// url: string;
|
||||
// discord_message_id?: string;
|
||||
// vod_id?: string;
|
||||
// date?: string;
|
||||
}
|
||||
|
||||
|
||||
|
||||
function assertPayload(payload: any): asserts payload is Payload {
|
||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
|
||||
if (typeof payload.url !== "string") throw new Error("invalid url");
|
||||
if (typeof payload.vod_id !== "string") throw new Error(`invalid vod_id=${payload.vod_id}`);
|
||||
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
|
||||
if (typeof payload.recording_id !== "string") throw new Error("invalid recording_id");
|
||||
}
|
||||
|
||||
async function assertVod(payload: Payload) {
|
||||
|
||||
let { recording_id } = payload
|
||||
const recording = await getRecording(recording_id)
|
||||
if (!recording) throw new Error(`failed to getRecording id=${recording_id}`);
|
||||
let { url, vod_id, date } = recording
|
||||
if (vod_id) return { vodId: vod_id };
|
||||
if (!date) date = new Date().toISOString();
|
||||
|
||||
const vtuberId = await findOrCreateVtuber({ url })
|
||||
const streamId = await findOrCreateStream({ vtuberId, date: new Date(date) })
|
||||
if (!streamId) throw new Error(`failed to find or create a Stream in database`);
|
||||
const vod = await createVod({ stream_id: streamId, vtuber: vtuberId, url, date, recording_id })
|
||||
if (!vod) throw new Error('failed to createVod. please try again.')
|
||||
|
||||
return { vodId: vod.id }
|
||||
}
|
||||
|
||||
|
||||
// async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
|
||||
// helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
|
||||
// const abortController = new AbortController()
|
||||
// const abortSignal = abortController.signal
|
||||
// const accessKeyId = configs.s3AccessKeyId;
|
||||
// const secretAccessKey = configs.s3SecretAccessKey;
|
||||
// const region = configs.s3Region;
|
||||
// const endpoint = configs.s3Endpoint;
|
||||
// const bucket = configs.s3UscBucket;
|
||||
// const playlistUrl = await getPlaylistUrl(url)
|
||||
// if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
|
||||
// helpers.logger.info(`playlistUrl=${playlistUrl}`)
|
||||
// const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
|
||||
// const inputStream = Record.getFFmpegStream({ url: playlistUrl })
|
||||
// const onProgress = (fileSize: number) => {
|
||||
// updateSegmentInDatabase({ segment_id, fileSize, helpers })
|
||||
// .then(checkIfAborted)
|
||||
// .then((isAborted) => {
|
||||
// isAborted ? abortController.abort() : null
|
||||
// })
|
||||
// .catch((e) => {
|
||||
// helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance')
|
||||
// helpers.logger.error(e)
|
||||
// })
|
||||
// }
|
||||
|
||||
// const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal })
|
||||
// return record
|
||||
// }
|
||||
|
||||
// function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
|
||||
// return (!!segment?.vod?.is_recording_aborted)
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* # doRecordSegment
|
||||
*
|
||||
* Record a segment of a livestream using ffmpeg.
|
||||
*
|
||||
* Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can.
|
||||
*
|
||||
* This function creates a new segments and vods_segments_links entry in the db via Postgrest REST API.
|
||||
*
|
||||
* This function also names the S3 file (s3_key) with a datestamp and a cuid.
|
||||
*/
|
||||
// const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
|
||||
// const s3_key = `${new Date().toISOString()}-${createId()}.ts`
|
||||
// helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
|
||||
// const segment_id = await createSegmentInDatabase(s3_key, vod_id)
|
||||
// helpers.logger.info(`let's create a segmentsStreamLink...`)
|
||||
// const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id)
|
||||
// helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
|
||||
// // no try-catch block here, because we need any errors to bubble up.
|
||||
// const record = await getRecordInstance(url, segment_id)
|
||||
// helpers.logger.info(`we got a Record instance. now we record.start()`)
|
||||
// // console.log(record)
|
||||
// return record.start()
|
||||
// }
|
||||
|
||||
|
||||
|
||||
|
@ -99,65 +50,23 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
|
|||
|
||||
|
||||
assertPayload(payload)
|
||||
const { url, vod_id } = payload
|
||||
const vodId = vod_id
|
||||
|
||||
const playlistUrl = await getPlaylistUrl(url)
|
||||
if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`)
|
||||
const { recording_id: recordingId } = payload
|
||||
const recording = await getRecording(recordingId)
|
||||
if (!recording) throw new Error(`failed to getRecording() ${recordingId}`);
|
||||
const { url } = recording
|
||||
const { vodId } = await assertVod(payload)
|
||||
// await patchRecording(recording.id, { vod_id: vodId })
|
||||
|
||||
/**
|
||||
* RecordNextGeneration handles errors for us and re-throws ones that should fail the Task.
|
||||
* We intentionally do not use a try/catch block here.
|
||||
*/
|
||||
const recordNG = new RecordNextGeneration({ playlistUrl, vodId })
|
||||
const recordNG = new RecordNextGeneration({ url, vodId })
|
||||
await recordNG.done()
|
||||
|
||||
|
||||
|
||||
return;
|
||||
|
||||
|
||||
// try {
|
||||
// // if the VOD has been aborted, end Task with success
|
||||
// if ((await getVod(vod_id, helpers))?.is_recording_aborted) return;
|
||||
|
||||
// /**
|
||||
// * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
|
||||
// * This will take effect only when Record() throws an error.
|
||||
// * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry.
|
||||
// * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops.
|
||||
// *
|
||||
// * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
|
||||
// */
|
||||
// // await backOff(() => doRecordSegment(url, recordId, helpers))
|
||||
// await doRecordSegment(url, vodId, helpers)
|
||||
// } catch (e) {
|
||||
// // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' })
|
||||
// helpers.logger.error(`caught an error during record Task`)
|
||||
// if (e instanceof Error) {
|
||||
// helpers.logger.info(`error.name=${e.name}`)
|
||||
// if (e.name === 'RoomOfflineError') {
|
||||
// // If room is offline, we want to retry until graphile-worker retries expire.
|
||||
// // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
|
||||
// // graphile-worker will retry when we re-throw the error below.
|
||||
// helpers.logger.info(`Room is offline.`)
|
||||
// } else if (e.name === 'AbortError') {
|
||||
// // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
|
||||
// // We swallow the error and return in order to mark the job as succeeded.
|
||||
// helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
|
||||
// return
|
||||
// } else {
|
||||
// helpers.logger.error(e.message)
|
||||
// }
|
||||
// } else {
|
||||
// helpers.logger.error(JSON.stringify(e))
|
||||
// }
|
||||
// // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
|
||||
// helpers.logger.error(`we got an error during record Task so we throw and retry`)
|
||||
// throw e
|
||||
// }
|
||||
|
||||
// helpers.logger.info('record Task has finished')
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -2,4 +2,10 @@
|
|||
|
||||
Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail)
|
||||
|
||||
factory has a big disk and lots of RAM in order to do transcoding tasks
|
||||
factory has a big disk and lots of RAM in order to do transcoding tasks
|
||||
|
||||
|
||||
|
||||
## 240p encodes
|
||||
|
||||
ffmpeg -i ./pmel-cb-2023-03-04.mp4 -vf scale=-2:240 -b:v 368k -b:a 45k ./projektmelody-chaturbatep2023-03-04_240p.mp4
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
-- recordings table schema
|
||||
CREATE TABLE api.recordings (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
url TEXT NOT NULL,
|
||||
created_at timestamp(6) without time zone,
|
||||
updated_at timestamp(6) without time zone,
|
||||
discord_message_id TEXT
|
||||
);
|
||||
|
||||
-- roles & permissions for our backend automation user
|
||||
GRANT all ON api.recordings TO automation;
|
||||
GRANT SELECT ON api.recordings TO web_anon;
|
||||
|
||||
|
||||
-- we re-create this function to use recording_id instead of vod_id
|
||||
DROP FUNCTION public.tg__add_record_job CASCADE;
|
||||
CREATE FUNCTION public.tg__add_record_job() RETURNS trigger
|
||||
LANGUAGE plpgsql SECURITY DEFINER
|
||||
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
|
||||
AS $$
|
||||
begin
|
||||
PERFORM graphile_worker.add_job('record', json_build_object(
|
||||
'url', NEW.url,
|
||||
'recording_id', NEW.id
|
||||
), max_attempts := 6);
|
||||
return NEW;
|
||||
end;
|
||||
$$;
|
||||
|
||||
|
||||
|
||||
CREATE TRIGGER create_recording
|
||||
AFTER UPDATE ON api.recordings
|
||||
FOR EACH ROW
|
||||
EXECUTE FUNCTION tg__add_record_job();
|
|
@ -0,0 +1,6 @@
|
|||
|
||||
ALTER TABLE IF EXISTS api.recordings
|
||||
ALTER COLUMN created_at SET DEFAULT now();
|
||||
|
||||
ALTER TABLE IF EXISTS api.recordings
|
||||
ALTER COLUMN updated_at SET DEFAULT now();
|
|
@ -0,0 +1,8 @@
|
|||
-- create discord_interactions table
|
||||
CREATE TABLE api.discord_interactions (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
discord_message_id TEXT NOT NULL
|
||||
);
|
||||
GRANT all ON api.discord_interactions TO automation;
|
||||
GRANT SELECT ON api.discord_interactions TO web_anon;
|
||||
|
|
@ -0,0 +1,14 @@
|
|||
|
||||
-- add more cols to api.recordings
|
||||
ALTER TABLE api.recordings
|
||||
ADD COLUMN date TIMESTAMP(6) WITHOUT TIME ZONE;
|
||||
|
||||
ALTER TABLE api.recordings
|
||||
ADD COLUMN vod_id UUID REFERENCES api.vods(id);
|
||||
|
||||
ALTER TABLE api.recordings
|
||||
DROP COLUMN discord_message_id;
|
||||
|
||||
ALTER TABLE api.recordings
|
||||
ADD COLUMN discord_interaction_id UUID REFERENCES api.discord_interactions(id);
|
||||
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
DROP TRIGGER create_recording ON api.recordings;
|
||||
|
||||
CREATE TRIGGER recording_create
|
||||
AFTER INSERT ON api.recordings
|
||||
FOR EACH ROW
|
||||
EXECUTE PROCEDURE public.tg__add_record_job('record');
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
-- move is_aborted COL from vods to recordings
|
||||
ALTER TABLE api.vods
|
||||
DROP COLUMN is_recording_aborted;
|
||||
|
||||
ALTER TABLE api.recordings
|
||||
ADD COLUMN is_aborted BOOLEAN NOT NULL DEFAULT FALSE;
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
ALTER TABLE api.vods
|
||||
ADD COLUMN segments UUID REFERENCES api.segments(id);
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
|
||||
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
|
||||
|
||||
{
|
||||
"code": "PGRST201",
|
||||
"details": [
|
||||
{
|
||||
"cardinality": "one-to-many",
|
||||
"embedding": "segments with vods",
|
||||
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
|
||||
},
|
||||
{
|
||||
"cardinality": "many-to-one",
|
||||
"embedding": "segments with vods",
|
||||
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
|
||||
}
|
||||
],
|
||||
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
|
||||
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
||||
ALTER TABLE api.vods
|
||||
DROP COLUMN segments;
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
|
||||
we already have a many-to-one relation. this col is creating a one-to-many relation, causing the following problem.
|
||||
|
||||
{
|
||||
"code": "PGRST201",
|
||||
"details": [
|
||||
{
|
||||
"cardinality": "one-to-many",
|
||||
"embedding": "segments with vods",
|
||||
"relationship": "vods_segments_fkey using segments(id) and vods(segments)"
|
||||
},
|
||||
{
|
||||
"cardinality": "many-to-one",
|
||||
"embedding": "segments with vods",
|
||||
"relationship": "segments_vod_id_fkey using segments(vod_id) and vods(id)"
|
||||
}
|
||||
],
|
||||
"hint": "Try changing 'vods' to one of the following: 'vods!vods_segments_fkey', 'vods!segments_vod_id_fkey'. Find the desired relationship in the 'details' key.",
|
||||
"message": "Could not embed because more than one relationship was found for 'segments' and 'vods'"
|
||||
}
|
||||
|
||||
*/
|
||||
|
||||
|
||||
ALTER TABLE api.recordings
|
||||
DROP COLUMN vod_id;
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
ALTER TABLE api.recordings
|
||||
ADD COLUMN vod_id UUID REFERENCES api.vods(id);
|
|
@ -0,0 +1,3 @@
|
|||
|
||||
ALTER TABLE api.vods
|
||||
ADD COLUMN recording_id UUID REFERENCES api.recordings(id);
|
|
@ -0,0 +1,4 @@
|
|||
|
||||
-- potentially unecessary fk, because api.vods has the relation too.
|
||||
ALTER TABLE api.recordings
|
||||
DROP COLUMN vod_id;
|
|
@ -32,7 +32,7 @@ export class RoomOfflineError extends Error {
|
|||
|
||||
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
|
||||
console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`)
|
||||
let args = ['-g', roomUrl]
|
||||
let args = ['-4', '-g', roomUrl]
|
||||
if (proxy) {
|
||||
console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`)
|
||||
args = args.concat(['--proxy', configs.httpProxy])
|
||||
|
|
Loading…
Reference in New Issue