import type { Task, Helpers } from "graphile-worker"; import { PrismaClient } from "../../generated/prisma"; import { withAccelerate } from "@prisma/extension-accelerate"; import { getOrDownloadAsset } from "../utils/cache"; import { env } from "../config/env"; import { S3Client } from "@aws-sdk/client-s3"; import { getS3Client, uploadFile } from "../utils/s3"; import { nanoid } from "nanoid"; import { basename, join, dirname } from "node:path"; import { mkdirp } from "fs-extra"; import { listFilesRecursive } from "../utils/filesystem"; import { getMimeType } from "../utils/mimetype"; import { getNanoSpawn } from "../utils/nanoSpawn"; const prisma = new PrismaClient().$extends(withAccelerate()); interface Payload { vodId: string; } interface HlsVariant { videoPath: string; resolution: string; // e.g. "1920x1080" bandwidth: number; // in bits per second mimetype: string; } // // Create a fragmented MP4 for use with fMP4 HLS // async function createFragmentedMp4(helpers: Helpers, inputFilePath: string) { // const outputFilePath = join(env.CACHE_ROOT, `${nanoid()}.mp4`) // await spawn('ffmpeg', [ // '-i', inputFilePath, // '-c', 'copy', // '-f', 'mp4', // '-movflags', 'frag_keyframe+empty_moov', // outputFilePath // ], { // stdout: 'inherit', // stderr: 'inherit', // }) // return outputFilePath // } export async function createVariants(helpers: Helpers, inputFilePath: string): Promise { const workdir = join(env.CACHE_ROOT, nanoid()); await mkdirp(workdir); const baseName = basename(inputFilePath, '.mp4'); const spawn = await getNanoSpawn() const resolutions = [ { width: 1920, height: 1080, bitrate: 4000000, name: '1080p' }, // 4Mbps { width: 1280, height: 720, bitrate: 2500000, name: '720p' }, // 2.5Mbps { width: 854, height: 480, bitrate: 1000000, name: '480p' } // 1Mbps ]; const outputPaths: HlsVariant[] = []; for (const { width, height, bitrate, name } of resolutions) { const outputPath = join(workdir, `${baseName}_${name}.mp4`); await spawn('ffmpeg', [ '-i', inputFilePath, '-map', '0:v:0', '-map', '0:a:0', '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-b:v', `${bitrate}`, '-s', `${width}x${height}`, '-c:a', 'aac', '-b:a', '128k', outputPath ], { stdout: 'inherit', stderr: 'inherit', }); outputPaths.push({ videoPath: outputPath, bandwidth: bitrate, resolution: `${width}x${height}`, mimetype: 'video/mp4', }); } return outputPaths; } export async function packageHls( helpers: Helpers, variants: HlsVariant[], outputDir: string ): Promise { const args: string[] = []; const spawn = await getNanoSpawn() // Optional: sort variants by bandwidth descending variants.sort((a, b) => b.bandwidth - a.bandwidth); for (const variant of variants) { const baseName = basename(variant.videoPath, '.mp4'); const name = variant.resolution; // "1920x1080", etc. const videoOut = join(outputDir, `${baseName}_video.mp4`); const audioOut = join(outputDir, `${baseName}_audio.mp4`); // Add video stream args.push(`input=${variant.videoPath},stream=video,output=${videoOut},hls_name=${name},hls_group_id=video`); // Add audio stream (with consistent group-id and friendly name) args.push(`input=${variant.videoPath},stream=audio,output=${audioOut},hls_name=Audio ${name},hls_group_id=audio`); } const masterPlaylist = join(outputDir, 'master.m3u8'); args.push(`--hls_master_playlist_output=${masterPlaylist}`); args.push('--generate_static_live_mpd'); // helps keep segments stable args.push('--segment_duration=2'); // matches Twitch’s chunk size helpers.logger.info(`PILLS HERE\nPILLS HERE\nPILLS HERE\nPILLS HERE\nPILLS HERE\nPILLS HERE\nPILLS HERE`) await spawn('packager', args, { stdout: 'inherit', stderr: 'inherit', }); return masterPlaylist; } function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload-- was not an object."); if (typeof payload.vodId !== "string") throw new Error("invalid payload-- was missing vodId"); } export default async function createHlsPlaylist(payload: any, helpers: Helpers) { assertPayload(payload) const { vodId } = payload const vod = await prisma.vod.findFirstOrThrow({ where: { id: vodId } }) // * [x] load vod // * [x] exit if video.hlsPlaylist already defined if (vod.hlsPlaylist) { helpers.logger.info(`Doing nothing-- vod ${vodId} already has a hlsPlaylist.`) return; // Exit the function early } if (!vod.sourceVideo) { throw new Error(`Failed to create hlsPlaylist-- vod ${vodId} is missing a sourceVideo.`); } helpers.logger.info(`Creating HLS Playlist.`) const s3Client = getS3Client() const taskId = nanoid() const workDirPath = join(env.CACHE_ROOT, taskId) const packageDirPath = join(workDirPath, 'package', 'hls') await mkdirp(packageDirPath) helpers.logger.info("download source video from pull-thru cache") const videoFilePath = await getOrDownloadAsset(s3Client, env.S3_BUCKET, vod.sourceVideo) helpers.logger.info(`videoFilePath=${videoFilePath}`) helpers.logger.info("create ABR variants") const variants = await createVariants(helpers, videoFilePath) helpers.logger.info('variants as follows') helpers.logger.info(JSON.stringify(variants)) helpers.logger.info("run shaka packager") const masterPlaylistPath = await packageHls(helpers, variants, packageDirPath) helpers.logger.debug(`masterPlaylistPath=${masterPlaylistPath}`) helpers.logger.info('uploading assets') let assets = await listFilesRecursive(workDirPath) helpers.logger.info('assets as follows') helpers.logger.info(JSON.stringify(assets)) for (let i = 0; i < assets.length; i++) { const asset = assets[i] const s3Key = `package/${taskId}/hls/${basename(asset)}` const mimetype = getMimeType(asset) await uploadFile(s3Client, env.S3_BUCKET, s3Key, asset, mimetype) }; helpers.logger.info("generate thumbnail s3 key") const s3Key = `package/${taskId}/hls/master.m3u8` // * [x] upload assets to s3 await uploadFile(s3Client, env.S3_BUCKET, s3Key, masterPlaylistPath, 'application/vnd.apple.mpegurl') // await uploadFile(s3Client, env.S3_BUCKET, s3Key, masterPlaylistPath, 'application/vnd.apple.mpegurl') // * [x] update vod record await prisma.vod.update({ where: { id: vodId }, data: { hlsPlaylist: s3Key } }); // * [x] done }