fp/services/our/src/plugins/streams.ts
CJ_Clippy 553de595d2
Some checks are pending
ci / build (push) Waiting to run
ci / test (push) Waiting to run
fix magnet creation
2025-09-23 18:45:17 -08:00

262 lines
8.2 KiB
TypeScript

import { PrismaClient } from '../../generated/prisma'
import { withAccelerate } from "@prisma/extension-accelerate"
import type { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify'
import { env } from '../config/env'
import { constants } from '../config/constants'
import { pipeline } from 'stream/promises'
import fs from 'fs'
import { tmpdir } from 'node:os'
import { nanoid } from 'nanoid'
import path from 'node:path'
import { type CS3Asset, getS3Client, type S3, uploadFile } from '../utils/s3'
import { S3Client } from '@aws-sdk/client-s3'
import { slug } from '../utils/formatters'
import { run } from '../utils/remove-bg'
import type { OnBehalfQuery } from '../types';
import { getTargetUser } from '../utils/authorization'
import logger from "../utils/logger";
const prisma = new PrismaClient().$extends(withAccelerate())
const s3Client = getS3Client()
const s3Resource: S3 = {
useSSL: true,
port: 443,
bucket: env.S3_BUCKET,
region: env.S3_REGION,
endPoint: env.S3_ENDPOINT,
accessKey: env.S3_KEY_ID,
pathStyle: env.S3_FORCE_PATH_STYLE,
secretKey: env.S3_APPLICATION_KEY
}
export default async function streamsRoutes(
fastify: FastifyInstance,
): Promise<void> {
fastify.post('/image', async function (req, reply) {
const data = await req.file();
if (!data) throw new Error('A file was not received');
// get the user id from the cookie
const twitchUser = req.session.get('twitch_user')
if (!twitchUser?.id) throw new Error('twitch id not found in cookie. please log in before trying again.');
const user = await prisma.user.findFirst({
where: {
twitchId: twitchUser.id
}
})
if (!user?.id) throw new Error('failed to lookup user. please log in and try again.');
logger.debug(`Received /image data. filename=${data.filename}, mimetype=${data.mimetype}, waifu-name=${data.fields['waifu-name']}, remove-bg=${data.fields['remove-bg']}`, data);
let tmpFile = path.join(tmpdir(), nanoid());
await pipeline(data.file, fs.createWriteStream(tmpFile));
const waifuName = data.fields?.['waifu-name']?.value;
if (!waifuName) throw new Error('Missing waifu-name in the form fields');
const removeBg = Boolean(data.fields?.['remove-bg']?.value === "true");
if (removeBg) {
tmpFile = await run(tmpFile)
}
function buildUrl(key: string) {
return `${constants.cdnOrigin}/${key}`;
}
function getS3Key(waifuName: string, filename: string, isWebp: boolean) {
logger.debug(`getS3Key called with ${waifuName} ${filename} ${isWebp}`)
const ext = (isWebp) ? 'webp' : filename.split('.').pop()?.toLowerCase();
return `img/${nanoid()}/${slug(waifuName).substring(0, 24)}.${ext}`
}
const mimetype = (removeBg) ? 'image/webp' : data.mimetype;
const pre: CS3Asset = {
file_path: tmpFile,
mimetype,
bytes: 0,
key: getS3Key(waifuName, data.filename, removeBg),
s3_resource: s3Resource
}
const uploadedAsset = await uploadFile(s3Client, s3Resource, pre, mimetype)
logger.debug('uploadedAsset as follows')
logger.debug(uploadedAsset)
const idk = await createRecord(waifuName, uploadedAsset.key, user.id)
logger.debug('idk as follows')
logger.debug(idk)
const url = buildUrl(idk.imageS3Key)
logger.trace('url as follows')
logger.trace(url)
reply.send({
url
});
async function createRecord(name: string, imageS3Key: string, authorId: number) {
const newWaifu = await prisma.waifu.create({
data: {
name: name,
imageS3Key,
authorId
},
})
logger.debug(newWaifu)
return newWaifu
}
});
fastify.get('/streams', function (request, reply) {
reply.send('This feature is coming soon.')
})
// fastify.post('/redeems', async function (request, reply) {
// // @todo add logic for handling a production redeem originating from Twitch
// const { onBehalfOf } = request.query as OnBehalfQuery;
// // simulated redeems come from the User and the request has a session in the cookie
// const userId = request.session.get('user_id')
// const requester = await prisma.user.findFirstOrThrow({
// where: {
// id: userId
// }
// })
// let targetUser = requester
// if (onBehalfOf) {
// if (!requester.twitchName) {
// return reply.status(500).send({
// error: true,
// message:
// 'Requesting editor does not have a twitchName defined. Please log out and in, then contact admin if error persists.'
// });
// }
// const isSelfRequest = onBehalfOf === requester.twitchName;
// if (!isSelfRequest) {
// if (!authorized) {
// return reply.status(401).send({
// error: true,
// message: 'Requesting editor is not authorized to edit settings for this channel.'
// });
// }
// targetUser = await prisma.user.findFirstOrThrow({
// where: { twitchName: onBehalfOf }
// });
// }
// }
// const { pickId } = request.body as { pickId: number }
// const pick = await prisma.pick.findFirstOrThrow({
// where: {
// id: pickId
// }
// })
// const redeem = await prisma.redeem.create({
// data: {
// user: {
// connect: { id: targetUser.id }
// },
// waifu: {
// connect: { id: pick.waifuId }
// },
// viewerTwitchId: targetUser.twitchId
// }
// })
// reply.send(redeem)
// })
// The challenge with SSE isn't SSE. It's presense, multiple listener sync, and integration within a fastify route handler.
// How do we know when we need to stop sending events?
// how do we track multiple open OBS browser sources?
// I think we would need PubSub
// And we have to manage fastify-sse-v2's bizarre syntax
// and we have to implement an async generator which integrates with an event emitter...
// I tried, I failed. Moving on, back to polling.
// poling btw, is so stupidly simple that it's got dozens of feature built-in.
// like error handling, for when the server goes down temporarily.
// authoritative answers, because the server always returns the full redeems list.
// fuck! polling is good stuff.
// fastify.get('/sse', (request: FastifyRequest, reply: FastifyReply) => {
// const eventEmitter = new EventEmitter();
// const interval = setInterval(() => {
// // logger.debug('> intervalling ' + nanoid())
// eventEmitter.emit('update', {
// name: 'tick',
// time: new Date(),
// });
// }, 100);
// // Async generator producing SSE events
// const asyncIterable = (async function* () {
// logger.debug('iterating!')
// for await (const [event] of on(eventEmitter, 'update', {})) {
// yield {
// event: event.name,
// data: JSON.stringify(event),
// };
// }
// })();
// // Here we assert the asyncIterable matches the expected SSE type
// reply.sse(asyncIterable as unknown as {
// data: string | object;
// event?: string;
// id?: string;
// retry?: number;
// });
// request.raw.on('close', () => {
// // abortController.abort();
// clearInterval(interval);
// reply.sseContext.source.end();
// })
// });
}