import { PrismaClient } from '../../generated/prisma' import { withAccelerate } from "@prisma/extension-accelerate" import type { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify' import { env } from '../config/env' import { constants } from '../config/constants' import { pipeline } from 'stream/promises' import fs from 'fs' import { tmpdir } from 'node:os' import { nanoid } from 'nanoid' import path from 'node:path' import { type CS3Asset, getS3Client, type S3, uploadFile } from '../utils/s3' import { S3Client } from '@aws-sdk/client-s3' import { slug } from '../utils/formatters' import { run } from '../utils/remove-bg' import type { OnBehalfQuery } from '../types'; import { getTargetUser } from '../utils/authorization' import logger from "../utils/logger"; const prisma = new PrismaClient().$extends(withAccelerate()) const s3Client = getS3Client() const s3Resource: S3 = { useSSL: true, port: 443, bucket: env.S3_BUCKET, region: env.S3_REGION, endPoint: env.S3_ENDPOINT, accessKey: env.S3_KEY_ID, pathStyle: env.S3_FORCE_PATH_STYLE, secretKey: env.S3_APPLICATION_KEY } export default async function streamsRoutes( fastify: FastifyInstance, ): Promise { fastify.post('/image', async function (req, reply) { const data = await req.file(); if (!data) throw new Error('A file was not received'); // get the user id from the cookie const twitchUser = req.session.get('twitch_user') if (!twitchUser?.id) throw new Error('twitch id not found in cookie. please log in before trying again.'); const user = await prisma.user.findFirst({ where: { twitchId: twitchUser.id } }) if (!user?.id) throw new Error('failed to lookup user. please log in and try again.'); logger.debug(`Received /image data. filename=${data.filename}, mimetype=${data.mimetype}, waifu-name=${data.fields['waifu-name']}, remove-bg=${data.fields['remove-bg']}`, data); let tmpFile = path.join(tmpdir(), nanoid()); await pipeline(data.file, fs.createWriteStream(tmpFile)); const waifuName = data.fields?.['waifu-name']?.value; if (!waifuName) throw new Error('Missing waifu-name in the form fields'); const removeBg = Boolean(data.fields?.['remove-bg']?.value === "true"); if (removeBg) { tmpFile = await run(tmpFile) } function buildUrl(key: string) { return `${constants.cdnOrigin}/${key}`; } function getS3Key(waifuName: string, filename: string, isWebp: boolean) { logger.debug(`getS3Key called with ${waifuName} ${filename} ${isWebp}`) const ext = (isWebp) ? 'webp' : filename.split('.').pop()?.toLowerCase(); return `img/${nanoid()}/${slug(waifuName).substring(0, 24)}.${ext}` } const mimetype = (removeBg) ? 'image/webp' : data.mimetype; const pre: CS3Asset = { file_path: tmpFile, mimetype, bytes: 0, key: getS3Key(waifuName, data.filename, removeBg), s3_resource: s3Resource } const uploadedAsset = await uploadFile(s3Client, s3Resource, pre, mimetype) logger.debug('uploadedAsset as follows') logger.debug(uploadedAsset) const idk = await createRecord(waifuName, uploadedAsset.key, user.id) logger.debug('idk as follows') logger.debug(idk) const url = buildUrl(idk.imageS3Key) logger.trace('url as follows') logger.trace(url) reply.send({ url }); async function createRecord(name: string, imageS3Key: string, authorId: number) { const newWaifu = await prisma.waifu.create({ data: { name: name, imageS3Key, authorId }, }) logger.debug(newWaifu) return newWaifu } }); fastify.get('/picks', async function (request: FastifyRequest, reply: FastifyReply) { const targetUser = await getTargetUser(request, reply) const waifuChoicePoolSize = targetUser.waifuChoicePoolSize const picks = await prisma.pick.findMany({ where: { userId: targetUser.id }, orderBy: { createdAt: 'desc' }, take: waifuChoicePoolSize, include: { waifu: true } }) reply.send({ data: picks }) }) fastify.post('/picks', async function (request: FastifyRequest, reply: FastifyReply) { const targetUser = await getTargetUser(request, reply) const userId = targetUser.id const { waifuId } = request.body as { waifuId: number } logger.debug(`userId=${userId}, waifuId=${waifuId}`) if (!userId) { return reply.code(400).send({ error: 'Missing userId' }) } if (!waifuId) { return reply.code(400).send({ error: 'Mising waifuId' }) } const picks = await prisma.pick.create({ data: { userId, waifuId }, include: { user: { include: { twitchToken: true } }, waifu: true, } }) logger.debug("~~~~~~ adding graphileWorker job") await fastify.graphileWorker.addJob('consolidate_twitch_channel_rewards', { userId }) reply.send({ id: picks.id }) }) fastify.delete('/picks', async function (request: FastifyRequest, reply: FastifyReply) { const userId = request.session.get('user_id') const { pickId } = request.body as { pickId: number } logger.debug(`userId=${userId}, pickId=${pickId}`) if (!userId || !pickId) { return reply.code(400).send({ error: 'Missing userId or pickId' }) } await prisma.pick.delete({ where: { id: pickId } }) reply.send(pickId) }) fastify.get('/redeems', async function (request, reply) { const userId = request.session.get('user_id') const user = await prisma.user.findFirstOrThrow({ where: { id: userId } }) const redeems = await prisma.redeem.findMany({ where: { userId: user.id, createdAt: { gt: new Date(user.clearRedeemsCursor) } }, orderBy: { createdAt: 'desc' }, take: user.maxOnScreenWaifus, include: { waifu: true } }) reply.send({ data: redeems }) }) fastify.post('/redeems', async function (request, reply) { const targetUser = await getTargetUser(request, reply); logger.debug(`we are creating a redeem and the targetuser is id=${targetUser.id}`) const { pickId } = request.body as { pickId: number }; if (!pickId) throw new Error('pickId was missing'); const pick = await prisma.pick.findFirstOrThrow({ where: { id: pickId } }); const redeem = await prisma.redeem.create({ data: { user: { connect: { id: targetUser.id } }, waifu: { connect: { id: pick.waifuId } }, viewerTwitchId: targetUser.twitchId }, include: { waifu: true } }); reply.send({ data: redeem }); }); fastify.delete('/redeems', async function (request, reply) { const targetUser = await getTargetUser(request, reply); logger.debug(`we are deleting redeems and the targetuser is id=${targetUser.id}`) await prisma.user.update({ where: { id: targetUser.id }, data: { clearRedeemsCursor: new Date().toISOString() } }); reply.send({ data: [] }); }) fastify.get('/streams', function (request, reply) { reply.send('This feature is coming soon.') }) // fastify.post('/redeems', async function (request, reply) { // // @todo add logic for handling a production redeem originating from Twitch // const { onBehalfOf } = request.query as OnBehalfQuery; // // simulated redeems come from the User and the request has a session in the cookie // const userId = request.session.get('user_id') // const requester = await prisma.user.findFirstOrThrow({ // where: { // id: userId // } // }) // let targetUser = requester // if (onBehalfOf) { // if (!requester.twitchName) { // return reply.status(500).send({ // error: true, // message: // 'Requesting editor does not have a twitchName defined. Please log out and in, then contact admin if error persists.' // }); // } // const isSelfRequest = onBehalfOf === requester.twitchName; // if (!isSelfRequest) { // const authorized = await isEditorAuthorized(requester.twitchName, onBehalfOf); // if (!authorized) { // return reply.status(401).send({ // error: true, // message: 'Requesting editor is not authorized to edit settings for this channel.' // }); // } // targetUser = await prisma.user.findFirstOrThrow({ // where: { twitchName: onBehalfOf } // }); // } // } // const { pickId } = request.body as { pickId: number } // const pick = await prisma.pick.findFirstOrThrow({ // where: { // id: pickId // } // }) // const redeem = await prisma.redeem.create({ // data: { // user: { // connect: { id: targetUser.id } // }, // waifu: { // connect: { id: pick.waifuId } // }, // viewerTwitchId: targetUser.twitchId // } // }) // reply.send(redeem) // }) fastify.get('/obs/:obsToken', async function ( request: FastifyRequest<{ Params: { obsToken: string } }>, reply: FastifyReply ) { const { obsToken } = request.params const { cdnOrigin } = constants await prisma.user.findFirstOrThrow({ where: { obsToken } }); return reply.view("obs.ejs", { obsToken, cdnOrigin }, { layout: 'layouts/main.hbs' }); }); fastify.get('/obs/:obsToken/redeems', async function ( request: FastifyRequest<{ Params: { obsToken: string } }>, reply: FastifyReply ) { const user = await prisma.user.findFirstOrThrow({ where: { obsToken: request.params.obsToken } }) const redeems = await prisma.redeem.findMany({ where: { userId: user.id, createdAt: { gt: new Date(user.clearRedeemsCursor) } }, orderBy: { createdAt: 'desc' }, take: user.maxOnScreenWaifus, include: { waifu: true } }) reply.send({ data: redeems }) }) // The challenge with SSE isn't SSE. It's presense, multiple listener sync, and integration within a fastify route handler. // How do we know when we need to stop sending events? // how do we track multiple open OBS browser sources? // I think we would need PubSub // And we have to manage fastify-sse-v2's bizarre syntax // and we have to implement an async generator which integrates with an event emitter... // I tried, I failed. Moving on, back to polling. // poling btw, is so stupidly simple that it's got dozens of feature built-in. // like error handling, for when the server goes down temporarily. // authoritative answers, because the server always returns the full redeems list. // fuck! polling is good stuff. // fastify.get('/sse', (request: FastifyRequest, reply: FastifyReply) => { // const eventEmitter = new EventEmitter(); // const interval = setInterval(() => { // // logger.debug('> intervalling ' + nanoid()) // eventEmitter.emit('update', { // name: 'tick', // time: new Date(), // }); // }, 100); // // Async generator producing SSE events // const asyncIterable = (async function* () { // logger.debug('iterating!') // for await (const [event] of on(eventEmitter, 'update', {})) { // yield { // event: event.name, // data: JSON.stringify(event), // }; // } // })(); // // Here we assert the asyncIterable matches the expected SSE type // reply.sse(asyncIterable as unknown as { // data: string | object; // event?: string; // id?: string; // retry?: number; // }); // request.raw.on('close', () => { // // abortController.abort(); // clearInterval(interval); // reply.sseContext.source.end(); // }) // }); }