import { spawn } from 'child_process'; import { PassThrough, pipeline, Readable, Writable } from 'stream'; import prettyBytes from 'pretty-bytes'; import { Upload } from "@aws-sdk/lib-storage"; import { S3Client } from "@aws-sdk/client-s3"; import 'dotenv/config' import { createWriteStream } from 'fs'; const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0' export interface RecordArgs { filename?: string; s3Client: S3Client; bucket: string; date?: string; inputStream: Readable; jobId: string; } interface MakeS3ClientOptions { accessKeyId: string; secretAccessKey: string; region: string; endpoint: string } interface getFFmpegOptions { url: string; } export default class Record { private s3Client: S3Client; private uploadStream: PassThrough; inputStream: Readable; counter: number; bucket: string; keyName: string; datestamp: string; filename?: string; jobId: string; date?: string; // saveToDiskStream: Writable; constructor({ inputStream, s3Client, bucket, jobId }: RecordArgs) { if (!inputStream) throw new Error('Record constructor was missing inputStream.'); if (!bucket) throw new Error('Record constructor was missing bucket.'); if (!jobId) throw new Error('Record constructer was missing jobId!'); if (!s3Client) throw new Error('Record constructer was missing s3Client'); this.inputStream = inputStream this.s3Client = s3Client this.bucket = bucket this.jobId = jobId this.counter = 0 this.datestamp = new Date().toISOString() this.keyName = `${this.datestamp}-${jobId}.ts` this.uploadStream = new PassThrough() // this.saveToDiskStream = createWriteStream('/tmp/idk.ts') // @todo delete this line } static makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }: MakeS3ClientOptions): S3Client { const client = new S3Client({ endpoint, region, credentials: { accessKeyId, secretAccessKey } }) return client } static getFFmpegStream({ url }: getFFmpegOptions): Readable { console.log(`getFFmpegStream using url=${url}`) const ffmpegProc = spawn('ffmpeg', [ '-headers', `"User-Agent: ${ua0}"`, '-i', url, '-c:v', 'copy', '-c:a', 'copy', '-movflags', 'faststart', '-y', '-f', 'mpegts', '-loglevel', 'quiet', 'pipe:1' ], { // ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang stdio: ['pipe', 'pipe', 'ignore'] }) return ffmpegProc.stdout } // async saveToDisk() { // return new Promise((resolve, reject) => { // this.saveToDiskStream.once('exit', resolve) // this.saveToDiskStream.once('error', reject) // }) // } async uploadToS3() { const target = { Bucket: this.bucket, Key: this.keyName, Body: this.uploadStream } // greets https://stackoverflow.com/a/70159394/1004931 try { const parallelUploads3 = new Upload({ client: this.s3Client, partSize: 1024 * 1024 * 5, queueSize: 1, leavePartsOnError: false, params: target, }); parallelUploads3.on("httpUploadProgress", (progress) => { if (progress?.loaded) { console.log(`loaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`); } else { console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`) } }); console.log('awaiting parallelUploads3.done()...') await parallelUploads3.done(); console.log('parallelUploads3.done() is complete.') } catch (e) { if (e instanceof Error) { console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`) throw e } else { throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`) } } } async start() { // @todo remove this // @todo remove this -- this is test code to validate one stream at a time. here we are saving to disk // @todo remove this // streams setup this.uploadStream.on('data', (data) => { this.counter += data.length if (this.counter % (1 * 1024 * 1024) <= 1024) { console.log(`Received ${this.counter} bytes (${prettyBytes(this.counter)})`); } }) this.uploadStream.on('close', () => { console.log('[!!!] upload stream has closed') }) this.uploadStream.on('error', (e) => { console.error('there was an error on the uploadStream. error as follows') console.error(e) }) // T.M.I. // this.uploadStream.on('drain', () => { // console.info('[vvv] drain on uploadStream.') // }) // input stream event handlers this.inputStream.on('close', () => { console.log('[!!!] input stream has closed.') }) this.inputStream.on('error', (e) => { console.error('there was an error on the inputStream. error as follows') console.error(e) }) this.inputStream.on('drain', () => { console.info('[vvv] drain on inputStream.') }) // pipe the ffmpeg stream to the S3 upload stream // this has the effect of uploading the stream to S3 at the same time we're recording it. pipeline( this.inputStream, // this.saveToDiskStream, // @todo delete this test code this.uploadStream, // @todo restore this code (err) => { if (err) { console.error(`pipeline errored.`) console.error(err) } else { console.log('pipeline succeeded.') } } ) // await this.saveToDisk() console.log('awaiting uploadToS3()...') await this.uploadToS3() console.log('uploadToS3() is complete.') return { jobId: this.jobId, keyName: this.keyName } } async stop() { throw new Error('@todo please implement') } }