import type { Helpers, Task, Job } from "graphile-worker"; import { PrismaClient } from "../../generated/prisma"; import { withAccelerate } from "@prisma/extension-accelerate"; import { addMinutes, addSeconds } from 'date-fns'; interface Payload { vodId: string; } const prisma = new PrismaClient().$extends(withAccelerate()); const RECHECK_DELAY_SECONDS = 300; /** * * scheduleVodProcessing identities vod processing tasks needed to be run, and schedules those tasks. * */ function isPayloadValid(payload: unknown): payload is Payload { return !!payload && typeof payload === "object" && "vodId" in payload; } const scheduleVodProcessing: Task = async (payload: unknown, helpers) => { if (!isPayloadValid(payload)) { throw new Error("Invalid payload: expected { vodId: string }"); } const { vodId } = payload; helpers.logger.info(`Starting processing for VOD ${vodId}`); const vod = await prisma.vod.findUnique({ where: { id: vodId } }); if (!vod) { helpers.logger.error(`VOD not found: ${vodId}`); return; } // Schedule required jobs const jobs: Promise[] = []; if (!vod.sourceVideo) jobs.push(helpers.addJob("getSourceVideo", { vodId })); if (!vod.sha256sum) jobs.push(helpers.addJob("generateVideoChecksum", { vodId })); if (!vod.thumbnail) jobs.push(helpers.addJob("createVideoThumbnail", { vodId })); if (!vod.hlsPlaylist) jobs.push(helpers.addJob("createHlsPlaylist", { vodId })); if (!vod.asrVtt) jobs.push(helpers.addJob("createAsrVtt", { vodId })); if (!vod.cidv1) jobs.push(helpers.addJob("createIpfsCid", { vodId })); const changes = jobs.length; if (changes > 0) { await Promise.all(jobs); await prisma.vod.update({ where: { id: vodId }, data: { status: "processing" } }); helpers.logger.info(`Scheduled ${changes} jobs for VOD ${vodId}`); // Schedule next check // @huh? @todo IDK what is up with this, but it seems to run right away even though it has the runAt defined. // @huh? @todo Because it runs immediately, this makes it an infinite loop. Disabling for now. // await helpers.addJob("scheduleVodProcessing", { // vodId, // runAt: addMinutes(new Date(), 1) // Check again in 1 minute // }); } else { // All jobs completed - finalize await prisma.vod.update({ where: { id: vodId }, data: { status: "processed" } }); helpers.logger.info(`All processing complete for VOD ${vodId}`); } }; export default scheduleVodProcessing