fp/services/capture/src/poc-s3.ts

123 lines
3.3 KiB
TypeScript

import { PassThrough, pipeline, Readable } from "stream";
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import { createReadStream } from 'fs';
import { createId } from '@paralleldrive/cuid2';
import prettyBytes from 'pretty-bytes';
import dotenv from 'dotenv'
dotenv.config({
path: '../../.env.development'
})
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME missing in env');
if (!process.env.S3_BUCKET_KEY_ID) throw new Error('S3_BUCKET_KEY_ID missing in env');
if (!process.env.S3_BUCKET_APPLICATION_KEY) throw new Error('S3_BUCKET_APPLICATION_KEY missing in env');
function makeProgressTicker(counter: number) {
const ticker = setInterval(() => {
console.log(`[progress] ${counter} bytes (aggregate) (${prettyBytes(counter)}) have passed through the pipeline.`)
}, 1000 * 30)
return ticker
}
function makeS3Client() {
const client = new S3Client({
endpoint: 'https://s3.us-west-000.backblazeb2.com',
region: 'us-west-000',
credentials: {
accessKeyId: process.env.S3_BUCKET_KEY_ID!,
secretAccessKey: process.env.S3_BUCKET_APPLICATION_KEY!
}
})
return client
}
async function uploadToS3({ client, uploadStream, keyName }: { client: S3Client, uploadStream: NodeJS.ReadableStream, keyName: string }) {
const target = {
Bucket: process.env.S3_BUCKET_NAME!,
Key: keyName,
Body: new Readable().wrap(uploadStream)
}
// greets https://stackoverflow.com/a/70159394/1004931
try {
const parallelUploads3 = new Upload({
client: client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
leavePartsOnError: false,
params: target,
});
parallelUploads3.on("httpUploadProgress", (progress) => {
console.log(progress)
if (progress?.loaded) {
console.log(`loaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`);
} else {
console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`)
}
});
await parallelUploads3.done();
} catch (e) {
if (e instanceof Error) {
console.error(`while uploading a file to s3, we encountered an error`)
throw new Error(e.message);
} else {
throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
}
}
}
async function main() {
let counter = 0
const client = makeS3Client()
const ticker = makeProgressTicker(counter)
const datestamp = new Date().toISOString()
const keyName = `${datestamp}-stream3-chaturbate-${createId()}.ts`
console.log(`Uploading ${keyName} to S3`)
/**
* setup the streams which process the data
*/
const ffmpegStream = createReadStream('/home/cj/Downloads/stream-23894234.ts')
const uploadStream = new PassThrough()
// update the progress ticker data
uploadStream.on('data', (data) => {
counter += data.length
})
/**
* we set up a pipeline which has an readable stream (ffmpeg), a transform stream (debug), and a writable stream (s3 Upload)
*/
pipeline(
ffmpegStream,
uploadStream,
(err) => {
if (err) {
console.error(`pipeline errored.`)
console.error(err)
} else {
console.log('pipeline succeeded.')
}
}
)
await uploadToS3({client, uploadStream, keyName })
clearInterval(ticker)
}
main()