import { createId } from '@paralleldrive/cuid2' import { spawn } from 'child_process'; import { ua0 } from '@futureporn/scout/ua.js' import { PassThrough, pipeline, Readable } from 'stream'; import prettyBytes from 'pretty-bytes'; import { Upload } from "@aws-sdk/lib-storage"; import { S3Client } from "@aws-sdk/client-s3"; import 'dotenv/config' export interface RecordArgs { filename?: string; channel: string; s3Client: S3Client; bucket: string; date?: string; inputStream: Readable; } interface MakeS3ClientOptions { accessKeyId: string; secretAccessKey: string; region: string; endpoint: string } interface getFFmpegDownloadOptions { url: string; } export default class Record { readonly id: string; private s3Client: S3Client; private uploadStream: PassThrough; private ticker?: NodeJS.Timeout; inputStream: Readable; counter: number; bucket: string; keyName: string; datestamp: string; filename?: string; channel: string; date?: string; constructor({ inputStream, channel, s3Client, bucket }: RecordArgs) { if (!inputStream) throw new Error('Record constructor was missing inputStream.'); if (!bucket) throw new Error('Record constructor was missing bucket.'); if (!channel) throw new Error('Record constructer was missing channel!'); if (!s3Client) throw new Error('Record constructer was missing s3Client'); this.inputStream = inputStream this.id = createId() this.s3Client = s3Client this.bucket = bucket this.channel = channel this.counter = 0 this.datestamp = new Date().toISOString() this.keyName = `${this.datestamp}-${channel}-${createId()}.ts` this.uploadStream = new PassThrough() } makeProgressTicker() { this.ticker = setInterval(() => { console.log(`[progress] ${this.counter} bytes (aggregate) (${prettyBytes(this.counter)}) have passed through the pipeline.`) }, 1000 * 30) } static makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }: MakeS3ClientOptions): S3Client { const client = new S3Client({ endpoint, region, credentials: { accessKeyId, secretAccessKey } }) return client } static getFFmpegDownload({ url }: getFFmpegDownloadOptions): Readable { const ffmpegProc = spawn('ffmpeg', [ '-headers', `"User-Agent: ${ua0}"`, '-i', url, '-c:v', 'copy', '-c:a', 'copy', '-movflags', 'faststart', '-y', '-f', 'mpegts', '-loglevel', 'quiet', 'pipe:1' ], { // ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang stdio: ['pipe', 'pipe', 'ignore'] }) return ffmpegProc.stdout } async uploadToS3() { const target = { Bucket: this.bucket, Key: this.keyName, // We do this to keep TS happy. Body expects a Readable, not a ReadableStream nor a NodeJS.ReadableStream // Body: new Readable().wrap(this.uploadStream) Body: this.uploadStream } // greets https://stackoverflow.com/a/70159394/1004931 try { const parallelUploads3 = new Upload({ client: this.s3Client, partSize: 1024 * 1024 * 5, queueSize: 1, leavePartsOnError: false, params: target, }); parallelUploads3.on("httpUploadProgress", (progress) => { console.log(progress) if (progress?.loaded) { console.log(`loaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`); } else { console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`) } }); await parallelUploads3.done(); } catch (e) { if (e instanceof Error) { console.error(`while uploading a file to s3, we encountered an error`) throw new Error(e.message); } else { throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`) } } } async start() { this.makeProgressTicker() // streams setup this.uploadStream.on('data', (data) => { this.counter += data.length }) // stream pipeline setup pipeline( this.inputStream, this.uploadStream, (err) => { if (err) { console.error(`pipeline errored.`) console.error(err) } else { console.log('pipeline succeeded.') } } ) await this.uploadToS3() clearInterval(this.ticker) return { id: this.id, keyName: this.keyName, channel: this.channel } } async stop() { throw new Error('@todo please implement') } }