import { spawn } from 'child_process'; import { EventEmitter, PassThrough, pipeline, Readable } from 'stream'; import prettyBytes from 'pretty-bytes'; import { Upload } from "@aws-sdk/lib-storage"; import { S3Client } from "@aws-sdk/client-s3"; import 'dotenv/config' const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0' export class UploadStreamClosedError extends Error { constructor(message: string) { super(message) Object.setPrototypeOf(this, UploadStreamClosedError.prototype) } } export interface RecordArgs { filename?: string; s3Client: S3Client; bucket: string; date?: string; inputStream: Readable; jobId: string; abortSignal: AbortSignal; onProgress: (fileSize: number) => void; } interface MakeS3ClientOptions { accessKeyId: string; secretAccessKey: string; region: string; endpoint: string } interface getFFmpegOptions { url: string; } export default class Record { private s3Client: S3Client; private uploadStream: PassThrough; inputStream: Readable; counter: number; bucket: string; keyName: string; datestamp: string; filename?: string; jobId: string; date?: string; abortSignal: AbortSignal; onProgress: Function; constructor({ inputStream, s3Client, bucket, jobId, abortSignal, onProgress }: RecordArgs) { if (!inputStream) throw new Error('Record constructor was missing inputStream.'); if (!bucket) throw new Error('Record constructor was missing bucket.'); if (!jobId) throw new Error('Record constructer was missing jobId!'); if (!s3Client) throw new Error('Record constructer was missing s3Client'); if (!abortSignal) throw new Error('Record constructer was missing abortSignal'); this.inputStream = inputStream this.onProgress = onProgress this.s3Client = s3Client this.bucket = bucket this.jobId = jobId this.counter = 0 this.datestamp = new Date().toISOString() this.keyName = `${this.datestamp}-${jobId}.ts` this.uploadStream = new PassThrough() this.abortSignal = abortSignal this.abortSignal.addEventListener("abort", this.abortEventListener.bind(this)) } static makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }: MakeS3ClientOptions): S3Client { const client = new S3Client({ endpoint, region, credentials: { accessKeyId, secretAccessKey } }) return client } static getFFmpegStream({ url }: getFFmpegOptions): Readable { console.log(`getFFmpegStream using url=${url}`) const ffmpegProc = spawn('ffmpeg', [ '-headers', `"User-Agent: ${ua0}"`, '-i', url, '-c:v', 'copy', '-c:a', 'copy', '-movflags', 'faststart', '-y', '-f', 'mpegts', '-loglevel', 'quiet', 'pipe:1' ], { // ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang stdio: ['pipe', 'pipe', 'ignore'] }) return ffmpegProc.stdout } abortEventListener() { console.log(`abortEventListener has been invoked. this.abortSignal is as follows`) console.log(this.abortSignal) console.log(JSON.stringify(this.abortSignal, null, 2)) const reason = this.abortSignal.reason console.log(`aborted the stream download with reason=${reason}`) this.inputStream.destroy(new Error(reason)) } async uploadToS3() { const target = { Bucket: this.bucket, Key: this.keyName, Body: this.uploadStream } // greets https://stackoverflow.com/a/70159394/1004931 try { const parallelUploads3 = new Upload({ client: this.s3Client, partSize: 1024 * 1024 * 5, queueSize: 1, leavePartsOnError: false, params: target, }); parallelUploads3.on("httpUploadProgress", (progress) => { if (progress?.loaded) { // console.log(progress) if (this.onProgress) this.onProgress(this.counter); // console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`); } else { console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`) } }); console.log('Waiting for parallelUploads3 to finish...') await parallelUploads3.done(); console.log('parallelUploads3 is complete.') } catch (e) { if (e instanceof Error) { if (e.name === 'AbortError') { console.error(`We got an error, AbortError which is something we know how to handle. we will NOT throw and instead return gracefully.`) return } else { console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`) throw e } } else { throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`) } } } async start() { // streams setup this.uploadStream.on('data', (data) => { this.counter += data.length }) this.uploadStream.on('close', () => { // if uploadStream closes before inputStream, throw an error. if (!this.inputStream.closed) { const msg = 'upload stream closed before download stream, which suggests the S3 upload failed.' console.error(msg) throw new UploadStreamClosedError(msg); } else { console.log('upload stream has closed. In this instance it is OK since the input stream is also closed.') } }) this.uploadStream.on('error', (e) => { console.error('there was an error on the uploadStream. error as follows') console.error(e) }) // T.M.I. // this.uploadStream.on('drain', () => { // console.info('[vvv] drain on uploadStream.') // }) // input stream event handlers this.inputStream.on('close', () => { console.log('[!!!] input stream has closed.') }) this.inputStream.on('error', (e) => { console.error('there was an error on the inputStream. error as follows') console.error(e) }) this.inputStream.on('drain', () => { console.info('[vvv] drain on inputStream.') }) // pipe the ffmpeg stream to the S3 upload stream // this has the effect of uploading the stream to S3 at the same time we're recording it. pipeline( this.inputStream, this.uploadStream, (err) => { if (err) { console.error(`pipeline errored.`) console.error(err) } else { console.log('pipeline succeeded.') } } ) console.log('awaiting uploadToS3()...') await this.uploadToS3() console.log('uploadToS3() is complete.') return { jobId: this.jobId, keyName: this.keyName } } async stop() { throw new Error('@todo please implement') } }