refactor for simpler controlflow
ci / build (push) Failing after 1s Details

This commit is contained in:
CJ_Clippy 2024-09-25 18:01:25 -08:00
parent 92cce429b3
commit 5af878c3a3
10 changed files with 375 additions and 161 deletions

View File

@ -532,6 +532,10 @@ k8s_resource(
labels=['backend'],
resource_deps=['postgrest', 'postgresql-primary'],
)
k8s_resource(
workload='chihaya',
labels=['backend']
)
k8s_resource(
workload='postgrest',
port_forwards=['9000'],

View File

@ -0,0 +1,30 @@
import { SegmentResponse } from '@futureporn/types';
import { configs } from './config.ts'
import querystring from 'node:querystring'
export default async function createSegment(s3_key: string, vod_id: string): Promise<SegmentResponse> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key,
vod_id
}
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(segmentPayload)
})
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const data = await res.json() as SegmentResponse[]
if (!data[0]) throw new Error('failed to createSegment! body[0] was missing.');
return data[0]
}

View File

@ -1,37 +0,0 @@
import { configs } from './config.ts'
import querystring from 'node:querystring'
export default async function createSegmentInDatabase(s3_key: string, vod_id: string): Promise<string> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key,
vod_id
}
// helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
// helpers.logger.info(JSON.stringify(segmentPayload))
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=headers-only',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify(segmentPayload)
})
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
console.error(msg)
throw new Error(msg);
}
const location = res.headers.get('location')
if (!location) throw new Error(`failed to get location header in response from postgrest`);
const parsedQuery = querystring.parse(location)
const segmentsId = parsedQuery['/segments?id']
if (!segmentsId) throw new Error('segmentsId was undefined which is unexpected');
if (Array.isArray(segmentsId)) throw new Error('segmentsId was an array which is unexpected');
const id = segmentsId.split('.').at(-1)
if (!id) throw new Error('failed to get id ');
return id
}

View File

@ -22,7 +22,7 @@ export default async function updateSegmentInDatabase({
}
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))`
console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
// console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
const res = await fetch(fetchUrl, {
method: 'PATCH',
headers: {

View File

@ -222,7 +222,7 @@ export interface RecordingRecord {
export interface SegmentResponse {
id: number;
id: string;
s3_key: string;
s3_id: string;
bytes: number;

View File

@ -30,7 +30,7 @@ describe('image', function () {
describe('getStoryboard', function () {
this.timeout(1000*60*15)
it('should accept a URL and return a path to image on disk', async function () {
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-19.mp4'
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-25.mp4'
const imagePath = await getStoryboard(url)
expect(imagePath).to.match(/\.png/)
})

View File

@ -2,7 +2,7 @@ Task names uses underscores because graphile_worker expects them to be that way
here are some administrative functions for clearing all tasks. Also see https://worker.graphile.org/docs/admin-functions
(search tags as follows because I keep losing this file)
(search tags, for easily finding this file by content)
administrative tasks
clear all
delete all

View File

@ -1,9 +1,5 @@
/**
* RecordNextGeneration.ts
*
* @important @todo There is a MEMORY LEAK in here somewhere!
* This causes OOMKiller to cull the capture process. Not good!
*
*/
import { VodResponse } from "@futureporn/types"
@ -21,13 +17,17 @@ import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
import prettyBytes from "pretty-bytes"
import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts"
import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts"
import createSegmentInDatabase from "@futureporn/fetchers/createSegmentInDatabase.ts"
import createSegment from "@futureporn/fetchers/createSegment.ts"
import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts"
import { createReadStream, createWriteStream } from "fs"
import pRetry, {AbortError} from 'p-retry'
import { type SegmentResponse } from '@futureporn/types'
import getPlaylistUrl from "@futureporn/fetchers/getPlaylistUrl.ts"
import { isBefore, sub } from 'date-fns'
export interface RecordNextGenerationArguments {
vodId: string;
playlistUrl: string;
url: string;
}
export class AdminAbortedError extends Error {
@ -55,6 +55,20 @@ export class UploadFailedError extends Error {
}
}
export class PlaylistFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, PlaylistFailedError.prototype)
this.name = this.constructor.name
this.message = `PlaylistFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class DownloadFailedError extends Error {
constructor(message?: string) {
super(message)
@ -69,22 +83,33 @@ export class DownloadFailedError extends Error {
/**
* RecordNextGeneration
* # RecordNextGeneration
*
* The function which records VODs in a Futureporn specific way.
* The function which records VODs in a Futureporn specific, fault-tolerant way.
*
* Issues
* ## Issues/TODO list
*
* @todo [x] onProgress stops firing
* @todo [ ] OOMKilled seen via development environment
* @todo [x] OOMKilled seen via development environment
* @todo [ ] undefined behavior during CB private shows
* @todo [ ] does not handle CB Hidden Camera ticket shows
* @todo [ ] Upload segments in a way that does not interrupt downloading new segments.
* There is an issue where a segment download ends, and the segment upload immediately begins.
* At first glance this looks like good behavior, but what is happening during the segment upload is that the livestream
* is continuing, but we aren't recording it anymore. We are using Backblaze, thus uploads are slow.
* We miss a lot of the stream because the upload takes many minutes.
* Instead of this behavior of immediately uploading after a segment download completes, we should upload once the livestream is finished,
* OR we should upload while concurrently downloading the next segment.
* @todo [ ] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context.
* There is an issue where the `record` task needs to retry after a temporary failure, but it cannot because there aren't any available workers.
* The solution is to not exit the `record` task at all, and instead keep the `record` task running, but suspended while a exponential backoff timer elapses.
* This way, the worker stays focused on the recording and retries until the stream has been offline for n minutes, at which point `record` is complete.
*
*/
export default class RecordNextGeneration {
public vodId: string;
public segmentId?: string;
public segmentVodLinkId?: string;
public playlistUrl: string;
public url: string;
public s3Key?: string;
public s3Bucket?: string;
public s3Client?: S3Client;
@ -93,24 +118,36 @@ export default class RecordNextGeneration {
private downloadStream?: Readable;
private uploadStream?: PassThrough;
private uploadInstance?: Upload;
private streamPipeline?: Promise<void>;
private diskStream?: Writable;
private uploadCounter: number;
private downloadCounter: number;
private databaseUpdateTimer?: NodeJS.Timeout;
private updateTimeout: number;
private abortController: AbortController;
private segments: SegmentResponse[];
private retries: number;
constructor({ vodId, playlistUrl }: RecordNextGenerationArguments) {
constructor({ vodId, url }: RecordNextGenerationArguments) {
this.vodId = vodId
this.playlistUrl = playlistUrl
this.url = url
this.uploadCounter = 0
this.downloadCounter = 0
this.updateTimeout = 30*1000
this.abortController = new AbortController()
this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this))
this.retries = 0
this.segments = []
}
async withRetry(fn: any, retries = 3) {
return pRetry(fn, {
onFailedAttempt: (e) => {
console.error(`Error during attempt:`, e);
},
retries
});
}
abortEventListener() {
@ -189,13 +226,16 @@ export default class RecordNextGeneration {
// maybe @see https://github.com/aws/aws-sdk-js-v3/issues/2694
static getDiskStream(s3Key?: string) {
const tmpDiskPath = join(tmpdir(), s3Key || `${nanoid()}.ts`)
return createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
}
static getFFmpegStream({ url }: { url: string }): Readable {
console.log(`getFFmpegStream using url=${url}`)
static getFFmpegStream({ playlistUrl }: { playlistUrl: string }): Readable {
console.log(`getFFmpegStream using playlistUrl=${playlistUrl}`)
const ffmpegProc = spawn('ffmpeg', [
'-headers', `"User-Agent: ${ua0}"`,
'-i', url,
'-i', playlistUrl,
'-c:v', 'copy',
'-c:a', 'copy',
'-movflags', 'faststart',
@ -210,6 +250,14 @@ export default class RecordNextGeneration {
return ffmpegProc.stdout
}
onUploadProgress(progress: Progress) {
if (progress?.loaded) {
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
this.reportMemoryUsage()
this.uploadCounter = progress.loaded
}
}
formatMemoryStats(stats: NodeJS.MemoryUsage): Record<string, string> {
const formattedStats: Record<string, string> = {};
@ -262,11 +310,10 @@ export default class RecordNextGeneration {
}
getNames() {
const tmpFileName = `${nanoid()}.ts`
this.s3Key = tmpFileName
this.tmpDiskPath = join(tmpdir(), tmpFileName)
console.log(`tmpDiskPath=${this.tmpDiskPath}`)
return { tmpDiskPath: this.tmpDiskPath, s3Key: this.s3Key }
const s3Key = `${nanoid()}.ts`
const tmpDiskPath = join(tmpdir(), s3Key)
console.log(`tmpDiskPath=${tmpDiskPath}`)
return { tmpDiskPath, s3Key }
}
/**
@ -277,45 +324,60 @@ export default class RecordNextGeneration {
* * segment
* * segment_vod_link
*/
async getDatabaseRecords() {
this.vod = await getVod(this.vodId)
if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
if (this.vod.recording.is_aborted) throw new AdminAbortedError();
// async getDatabaseRecords() {
// this.vod = await getVod(this.vodId)
// if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
// if (this.vod.recording.is_aborted) throw new AdminAbortedError();
if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
this.segmentId = await createSegmentInDatabase(this.s3Key, this.vodId)
this.segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
// if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
// const segmentId = await createSegment(this.s3Key, this.vodId)
// const segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
if (!this.vod) throw new Error('after getRecords() ran, this.vod was missing.');
if (!this.segmentId) throw new Error('after getRecords() ran, this.segmentId was missing.');
if (!this.segmentVodLinkId) throw new Error('after getRecords() ran, this.segmentVodLinkId was missing.');
// if (!this.vod) throw new Error('after getDatabaseRecords() ran, this.vod was missing.');
// if (!segmentId) throw new Error('after getDatabaseRecords() ran, segmentId was missing.');
// if (!segmentVodLinkId) throw new Error('after getDatabaseRecords() ran, segmentVodLinkId was missing.');
// return { segmentId, segmentVodLinkId }
// }
static async _dl(url: string, s3Key: string) {
const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new PlaylistFailedError();
const ffmpegStream = RecordNextGeneration.getFFmpegStream({ playlistUrl })
const diskStream = RecordNextGeneration.getDiskStream(s3Key)
const streamPipeline = pipeline(ffmpegStream, diskStream)
return {
pipeline: streamPipeline,
ffmpegStream,
diskStream,
}
}
async download() {
const { tmpDiskPath } = this.getNames()
const s3Client = this.getS3Client()
await this.getDatabaseRecords()
static async _ul(client: S3Client, diskPath: string, key: string) {
const diskStream = createReadStream(diskPath, { encoding: 'utf-8' })
this.downloadStream = RecordNextGeneration.getFFmpegStream({ url: this.playlistUrl })
this.diskStream = createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
const params = {
Bucket: configs.s3UscBucket,
Key: key,
Body: diskStream
}
const uploadInstance = new Upload({
client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
// @see https://github.com/aws/aws-sdk-js-v3/issues/2311
// tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
// It should instead be interpreted as, 'throwOnPartsError'
leavePartsOnError: true,
params
})
this.streamPipeline = pipeline(this.downloadStream, this.diskStream)
this.downloadStream.on('data', (data: any) => this.downloadCounter += data.length)
this.downloadStream.on('close', (arg: any) => console.log(`RecordNextGeneration downloadStream close. arg=${arg}`))
this.downloadStream.on('end', (arg: any) => console.log(`RecordNextGeneration downloadStream end. arg=${arg}`))
this.downloadStream.on('drain', (arg: any) => console.log(`RecordNextGeneration downloadStream drain. arg=${arg}`))
// this.downloadStream.on('pause', (arg: any) => console.log(`RecordNextGeneration downloadStream pause. arg=${arg}`))
this.downloadStream.on('error', (arg: any) => console.log(`RecordNextGeneration downloadStream error. arg=${arg}`))
this.diskStream.on('close', (arg: any) => console.log(`RecordNextGeneration diskStream close. arg=${arg}`))
this.diskStream.on('end', (arg: any) => console.log(`RecordNextGeneration diskStream end. arg=${arg}`))
// this.diskStream.on('drain', (arg: any) => console.log(`RecordNextGeneration diskStream drain. arg=${arg}`))
this.diskStream.on('pause', (arg: any) => console.log(`RecordNextGeneration diskStream pause. arg=${arg}`))
this.diskStream.on('error', (arg: any) => console.log(`RecordNextGeneration diskStream error. arg=${arg}`))
await this.streamPipeline
return {
uploadInstance,
diskStream,
}
}
async upload() {
@ -327,15 +389,8 @@ export default class RecordNextGeneration {
if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()');
const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' })
// this.uploadStream = new PassThrough()
this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream })
// this.uploadInstance.on('close', (arg: any) => console.log(`RecordNextGeneration uploadStream close. arg=${arg}`))
// this.uploadInstance.on('end', (arg: any) => console.log(`RecordNextGeneration uploadStream end. arg=${arg}`))
// this.uploadInstance.on('drain', (arg: any) => console.log(`RecordNextGeneration uploadStream drain. arg=${arg}`))
// this.uploadInstance.on('pause', (arg: any) => console.log(`RecordNextGeneration uploadStream pause. arg=${arg}`))
// this.uploadInstance.on('error', (arg: any) => console.log(`RecordNextGeneration uploadStream error. arg=${arg}`))
await this.uploadInstance.done()
}
@ -351,32 +406,35 @@ export default class RecordNextGeneration {
console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`)
if (e instanceof Error && e.name === 'RoomOfflineError') {
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
throw e
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
return
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
throw e
throw e
} else if (e instanceof Error && e.message === 'no tomes available') {
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
} else if (e instanceof Error && e.name === 'UploadFailedError') {
throw e
throw e
} else {
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
}
}
async updateDatabaseRecords() {
@ -393,9 +451,11 @@ export default class RecordNextGeneration {
}
async updateSegmentBytes() {
console.log(`updateSegmentBytes() start with this.segmentId=${this.segmentId} this.downloadCounter=${this.downloadCounter}`)
if (this.segmentId) {
await updateSegmentInDatabase({ segment_id: this.segmentId, fileSize: this.downloadCounter })
for (const [index, segment] of this.segments.entries()) {
if (segment.id) {
console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} segments.bytes=${segment.bytes}`)
await updateSegmentInDatabase({ segment_id: segment.id, fileSize: segment.bytes })
}
}
}
@ -415,48 +475,209 @@ export default class RecordNextGeneration {
}
/**
* done() waits for the recording to be complete.
* isTryingDownload
*
* There are always more tries unless the stream has been offline for greater than 5 minutes.
*/
isTryingDownload() {
const isSegmentPresent = (this.segments && this.segments?.length > 0)
if (!isSegmentPresent) return true;
const latestSegment = this.segments.at(-1)
const hasUpdatedTimestamp = latestSegment?.updated_at
if (!hasUpdatedTimestamp) throw new Error('latestSegment does not have an updated_at property');
const fiveMinsAgo = sub(new Date(), { minutes: 5 })
const lastUpdatedAt = latestSegment.updated_at
return (isBefore(lastUpdatedAt, fiveMinsAgo)) ? true : false;
}
/**
* done()
*
* Repeatedly download segments until there is no more stream.
* Stream is considered no-more once it has been offline for >5 minutes.
* When downloading is done, upload segments to S3.
*
* input: stream URL, such as 'https://chaturbate.com/projektmelody'
* ouptut: list of S3Files, such as [{ key: 'example1.ts' }, { key: 'example2.ts' }]
*/
async done() {
/**
* Errors thrown inside the setTimeout callback will end up as an uncaughtException.
* We handle those errors here to prevent node from exiting.
*/
process.on('uncaughtException', (e) => {
console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
console.log(e)
process.exit(69)
})
this.startProgressReports();
try {
this.startProgressReports()
await this.download()
await this.downloadSegments();
await this.uploadSegments();
} catch (e) {
return this.handleExceptions(e, 'download')
console.error(`An error was encountered during done() function. This should not happen under nominal scenarios. This may be a bug; please investigate.`)
throw e
} finally {
console.info(`🏁 finally block (👇 download phase)`)
// download is done, so we upload the segment to S3.
this.stopProgressReports();
}
}
try {
await this.upload()
/**
* downloadSegment
*
* Download a single segment.
* * Creates segment in the database
* * Pushes the segment to this.segments
*/
async downloadSegment() {
const s3_key = `${nanoid()}.ts`
const segment = await createSegment(s3_key, this.vodId)
if (!segment.id) {
throw new Error(`Failed to createSegment(). segment.id was missing.`);
}
console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
this.segments.push(segment)
const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
if (this.downloadStream) throw new Error(`If you see this error, there is a bug in your code. downloadSegment() tried to use this.downloadStream but it was already being used by some other part of the code. Please refactor so this.downloadStream is not used by more than one function at any given time.`);
this.downloadStream = ffmpegStream
ffmpegStream.on('data', (data) => {
let mySegment = this.segments.find((s) => s.id === segment.id)
if (mySegment) {
mySegment.bytes += data.length;
}
})
await pipeline
delete this.downloadStream // cleanup so another iteration can use
}
} catch (e) {
return this.handleExceptions(e, 'upload')
} finally {
// @todo - [ ] send S3 complete upload command if necessary
/**
* downloadSegments
*
* Fault-tolerant segment downloader.
* * Creates segments in the database.
* * Handles common errors
* * Retries until the stream has been offline for >5 minutes.
* * Recursively called
*/
async downloadSegments(): Promise<void> {
try {
await this.downloadSegment()
} catch (e) {
if (e instanceof Error && e.name === 'RoomOfflineError') {
// If the room is offline, then we want to retry immediately.
// We do this because the offline room might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
// If the room has been offline for >5 minutes, then we consider the stream concluded and we return.
if (this.isTryingDownload()) {
return this.downloadSegments()
} else {
return
}
console.info(`🏁 finally block (👆 upload phase)`)
this.stopProgressReports()
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
return this.downloadSegments()
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`Clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
console.error(`We encountered a DownloadFailedError. I'm unsure why this happens. I guess we will retry.`)
return this.downloadSegments()
}
}
}
/**
* uploadSegments
*
* Fault-tolerant segment uploader.
* * Uploads local segment files to S3
* * Handles common errors
* * Retries each segment up to 9 times
*/
async uploadSegments() {
try {
for (const segment of this.segments) {
await this.uploadSegment(segment)
}
} catch (e) {
console.error('error during uploadSegments(). error as follows.')
console.error(e)
throw e
}
}
async uploadSegment(segment: SegmentResponse) {
const diskPath = join(tmpdir(), segment.s3_key)
const key = segment.s3_key
const client = this.getS3Client()
await pRetry(async (attemptCount: number) => {
console.log(`uploadSegment() attempt ${attemptCount}`)
if (!this.s3Client) throw new Error('S3Client')
const { uploadInstance } = (await RecordNextGeneration._ul(client, diskPath, key))
uploadInstance.on('httpUploadProgress', () => this.onUploadProgress)
return uploadInstance.done()
}, {
onFailedAttempt: (e) => {
console.error(`failed to uploadSegment() with the following error. Retrying.`)
console.error(e)
},
retries: 9
})
}
}
// async done_old() {
// /**
// * Errors thrown inside the setTimeout callback will end up as an uncaughtException.
// * We handle those errors here to prevent node from exiting.
// */
// process.on('uncaughtException', (e) => {
// this.stopProgressReports()
// console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
// console.log(e)
// process.exit(69)
// })
// try {
// const client = this.getS3Client()
// console.log(`>> 1. Segment downloading phase`)
// while (this.isTryingDownload()) {
// if (!this.databaseUpdateTimer) this.startProgressReports();
// const s3_key = `${nanoid()}.ts`
// const segment = await createSegment(s3_key, this.vodId)
// if (!segment.id) {
// console.log('the following is segment')
// console.log(segment)
// throw new Error(`received invalid segment from db fetch()`);
// }
// this.segments.push(segment)
// console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
// const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
// this.downloadStream = ffmpegStream
// ffmpegStream.on('data', (data) => {
// let mSegment = this.segments.find((s) => s.id === segment.id)
// if (mSegment) {
// mSegment.bytes += data.length;
// }
// })
// await pipeline
// }
// console.log(`>> 2. Segment uploading phase`)
// } catch (e) {
// this.stopProgressReports()
// return this.handleExceptions(e, '_dl()|_ul()')
// }
// }
// }

View File

@ -1,7 +1,6 @@
import { Helpers, type Task } from 'graphile-worker'
import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts'
import RecordNextGeneration from '../RecordNextGeneration.ts'
import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
@ -58,14 +57,11 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
const { vodId } = await assertVod(payload)
// await patchRecording(recording.id, { vod_id: vodId })
const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new Error(`failed to get playlistUrl using url=${url}`);
/**
* RecordNextGeneration handles errors for us and re-throws ones that should fail the Task.
* We intentionally do not use a try/catch block here.
*/
const recordNG = new RecordNextGeneration({ playlistUrl, vodId })
const recordNG = new RecordNextGeneration({ url, vodId })
await recordNG.done()

View File

@ -32,7 +32,7 @@ export class RoomOfflineError extends Error {
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`)
let args = ['-g', roomUrl]
let args = ['-4', '-g', roomUrl]
if (proxy) {
console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`)
args = args.concat(['--proxy', configs.httpProxy])