import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; import { ElysiaAdapter } from '@bull-board/elysia'; import { Queue as QueueMQ, Worker } from 'bullmq'; import Elysia from 'elysia'; import { version } from './package.json'; import PocketBase from 'pocketbase'; import env from './env.ts'; import Mux from '@mux/mux-node'; const { subtle } = globalThis.crypto; async function createToken(playbackId: string) { // Set some base options we can use for a few different signing types // Type can be either video, thumbnail, gif, or storyboard let baseOptions = { keyId: env.MUX_SIGNING_KEY_ID, // Enter your signing key id here keySecret: env.MUX_SIGNING_KEY_PRIVATE_KEY, // Enter your base64 encoded private key here expiration: '7d', // E.g 60, "2 days", "10h", "7d", numeric value interpreted as seconds }; const token = await mux.jwt.signPlaybackId(playbackId, { ...baseOptions, type: 'video' }); console.log('video token', token); // Now the signed playback url should look like this: // https://stream.mux.com/${playbackId}.m3u8?token=${token} // // If you wanted to pass in params for something like a gif, use the // // params key in the options object // const gifToken = await mux.jwt.signPlaybackId(playbackId, { // ...baseOptions, // type: 'gif', // params: { time: '10' }, // }); // console.log('gif token', gifToken); // // Then, use this token in a URL like this: // // https://image.mux.com/${playbackId}/animated.gif?token=${gifToken} // // A final example, if you wanted to sign a thumbnail url with a playback restriction // const thumbnailToken = await mux.jwt.signPlaybackId(playbackId, { // ...baseOptions, // type: 'thumbnail', // params: { playback_restriction_id: YOUR_PLAYBACK_RESTRICTION_ID }, // }); // console.log('thumbnail token', thumbnailToken); // When used in a URL, it should look like this: // https://image.mux.com/${playbackId}/thumbnail.png?token=${thumbnailToken} return token } // function base64ToArrayBuffer(base64: string) { // const binary = Buffer.from(base64, 'base64'); // return binary.buffer.slice(binary.byteOffset, binary.byteOffset + binary.byteLength); // } // async function importPrivateKey(base64Key: string): Promise { // const keyData = base64ToArrayBuffer(base64Key); // return await subtle.importKey( // 'pkcs8', // format for private keys // keyData, // { name: 'RSASSA-PKCS1-v1_5', hash: 'SHA-256' }, // RS256 // false, // not extractable // ['sign'] // usage // ); // } const mux = new Mux({ tokenId: env.MUX_TOKEN_ID, tokenSecret: env.MUX_TOKEN_SECRET }); const pb = new PocketBase(env.POCKETBASE_URL); const sleep = (t: number) => new Promise((resolve) => setTimeout(resolve, t)); const redisOptions = { port: 6379, host: 'localhost', password: '', }; const createQueueMQ = (name: string) => new QueueMQ(name, { connection: redisOptions }); function setupBullMQProcessor(queueName: string) { new Worker( queueName, async (job) => { job.log(`the job ${job.data.title} is running`); const userData = await pb.collection('_superusers').authWithPassword(env.POCKETBASE_USERNAME, env.POCKETBASE_PASSWORD); job.log(`userData ${JSON.stringify(userData)}`); // // @todo presign all mux assets // // 1. for each VOD in pocketbase // // 2. get the muxPlaybackId const vods = await pb.collection('vods').getFullList({ sort: '-created', }); job.log(`there are ${vods.length} vods`); // job.log(JSON.stringify(vods, null, 2)); // 3. sign the muxPlaybackId for (let i = 0; i < vods.length; i++) { const vod = vods[i]; if (!vod) throw new Error(`vod ${i} missing`); if (vod.muxPlaybackId) { // const muxPlaybackToken = await mux.jwt.signPlaybackId(vod.muxPlaybackId, { // expiration: "7d" // }) const muxPlaybackToken = await createToken(vod.muxPlaybackId); job.log(`muxPlaybackToken is ${muxPlaybackToken}`); await pb.collection('vods').update(vod.id, { muxPlaybackToken }); } // Calculate progress as a percentage const progress = Math.round(((i + 1) / vods.length) * 100); await job.updateProgress(progress); } // const record1 = await pb.collection('vods').getOne('RECORD_ID', { // expand: 'relField1,relField2.subRelField', // }); // // list and filter "example" collection records // const result = await pb.collection('vods').getList(1, 20, { // filter: 'status = true && created > "2022-08-01 10:00:00"' // }); // // 4. update the VOD // const record = await pb.collection('demo').update('YOUR_RECORD_ID', { // title: 'Lorem ipsum', // }); // for (let i = 0; i <= 100; i++) { // await sleep(Math.random()); // await job.updateProgress(i); // await job.log(`Processing job at interval ${i}`); // if (Math.random() * 200 < 1) throw new Error(`Random error ${i}`); // } return { recordCount: 'ninja' }; }, { connection: redisOptions } ); } const muxBullMq = createQueueMQ('PresignMux'); setupBullMQProcessor(muxBullMq.name); const serverAdapter = new ElysiaAdapter('/ui'); createBullBoard({ queues: [new BullMQAdapter(muxBullMq)], serverAdapter, options: { // This configuration fixes a build error on Bun caused by eval (https://github.com/oven-sh/bun/issues/5809#issuecomment-2065310008) uiBasePath: 'node_modules/@bull-board/ui', }, }); const app = new Elysia() .onError(({ error, code, request }) => { console.error(error, code, request.method, request.url); if (code === 'NOT_FOUND') return 'NOT_FOUND'; }) .use(serverAdapter.registerPlugin()) .get('/', async () => { return `futureporn worker version ${version}` }) .get('/task', async ({ query }) => { await muxBullMq.add('Add', { title: query.title }); return { ok: true }; }); app.listen(3000, ({ port, url }) => { /* eslint-disable no-console */ console.log(`Running on ${url.hostname}:${port}...`); console.log(`For the UI of instance1, open http://localhost:${port}/ui`); console.log('Make sure Redis is running on port 6379 by default'); console.log('To populate the queue, run:'); console.log(` curl http://localhost:${port}/task?title=Example`); /* eslint-enable no-console */ });