import type { Task } from "graphile-worker"; import { PrismaClient } from "../../generated/prisma"; import { createHash } from 'crypto'; import { access } from "node:fs/promises"; import { pipeline } from 'stream/promises'; import path from 'path'; import { createReadStream } from "node:fs"; import { getOrDownloadAsset } from "../utils/cache"; import { env } from "../config/env"; import { getS3Client } from "../utils/s3"; const prisma = new PrismaClient(); interface Payload { vodId: string; } const client = getS3Client() const generateVideoChecksum: Task = async (payload: unknown, helpers) => { const { vodId } = payload as Payload; helpers.logger.info(`Generating checksum for VOD ${vodId}`); // 1. Get VOD record with source video path const vod = await prisma.vod.findUnique({ where: { id: vodId }, select: { sourceVideo: true } }); if (!vod?.sourceVideo) { throw new Error(`VOD ${vodId} has no source video`) } // 2. Verify file exists const videoPath = await getOrDownloadAsset(client, env.S3_BUCKET, vod.sourceVideo) helpers.logger.info(`videoPath=${videoPath}`) try { await access(videoPath); } catch (err) { throw new Error(`Source video not found at ${videoPath}`); } // 3. Generate SHA-256 hash try { const hash = createHash('sha256'); const fileStream = createReadStream(videoPath); await pipeline( fileStream, hash ); const checksum = hash.digest('hex'); helpers.logger.info(`Generated checksum for ${path.basename(vod.sourceVideo)}: ${checksum}`); // 4. Update VOD record await prisma.vod.update({ where: { id: vodId }, data: { sha256sum: checksum } }); } catch (err) { helpers.logger.error(`Failed to generate checksum: ${err.message}`); throw err; // Will trigger retry if configured } }; export default generateVideoChecksum;