fp/services/our/src/plugins/uploads.ts
CJ_Clippy afc9e2d1c8
Some checks failed
ci / build (push) Failing after 2s
ci / Tests & Checks (push) Failing after 1s
add vtuber rss
2025-08-11 20:51:21 -08:00

866 lines
23 KiB
TypeScript

import { PrismaClient } from '../../generated/prisma'
import { withAccelerate } from "@prisma/extension-accelerate"
import type { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify'
import { env } from '../config/env'
import { constants } from '../config/constants'
import { getSignedUrl } from '@aws-sdk/s3-request-presigner'
import type { UploadResult, VodSegment } from '../types/index'
import { isModerator } from '../utils/privs'
import { createId } from '@paralleldrive/cuid2';
import { z } from 'zod';
import {
AbortMultipartUploadCommand,
CompleteMultipartUploadCommand,
CreateMultipartUploadCommand,
ListPartsCommand,
PutObjectCommand,
UploadPartCommand,
} from '@aws-sdk/client-s3'
import { isUnprivilegedUser } from '../utils/privs'
import { getS3Client } from '../utils/s3'
import { UppyFile } from '../types/index'
import mime from 'mime-types'
interface MultipartBody {
type: string
filename: string
metadata?: Record<string, string>
}
function isValidPart(part) {
return (
part &&
typeof part === 'object' &&
Number(part.PartNumber) &&
typeof part.ETag === 'string'
)
}
/**
* @type {S3Client}
*/
let s3Client
const expiresIn = 900 // Define how long until a S3 signature expires.
const accessControlAllowOrigin = env.ORIGIN
// Generate a unique S3 key for the file
const generateS3Key = (ext: string) => `usc/${createId()}.${ext}`
// Extract the file parameters from the request
const extractFileParameters = (req) => {
const isPostRequest = req.method === 'POST'
const params = isPostRequest ? req.body : req.query
return {
filename: params.filename,
contentType: params.type
}
}
// Validate the file parameters
const validateFileParameters = (filename, contentType) => {
if (!filename || !contentType) {
throw new Error('Missing required parameters: filename and content type are required')
}
}
// Util: Validates if partNumber is a number from 1 to 10000
function validatePartNumber(partNumber: any): boolean {
const num = Number(partNumber)
return Number.isInteger(num) && num >= 1 && num <= 10000
}
const patchSchema = {
params: z.object({
uploadId: z.string()
}),
body: z.object({
status: z.enum(['approved', 'rejected', 'pending', 'ordering']).optional(),
segmentKeys: z.string().optional()
})
};
const prisma = new PrismaClient().$extends(withAccelerate())
export default async function uploadsRoutes(
fastify: FastifyInstance,
): Promise<void> {
fastify.get('/uploads', async function (request, reply) {
const userId = request.session.get('userId');
// Get the authenticated user
const user = await prisma.user.findFirstOrThrow({
where: { id: userId },
include: {
roles: true
}
});
const { cursor: cursorRaw, search = '' } = request.query as {
cursor?: string; // query strings are strings
search?: string;
};
const uploads = await prisma.vod.findMany({
where: {
status: {
in: ['ordering', 'pending', 'approved', 'rejected'],
},
},
});
const info = reply.flash('info')
return reply.view('uploads/list.hbs', {
uploads,
user,
info,
site: constants.site,
}, { layout: 'layouts/main.hbs' });
});
fastify.get('/uploads/:uploadId', async function (request, reply) {
const { uploadId } = request.params as { uploadId: string };
const userId = request.session.get('userId');
// Get the authenticated user
const user = await prisma.user.findFirstOrThrow({
where: { id: userId },
include: {
roles: true
}
});
const upload = await prisma.vod.findFirstOrThrow({
where: {
id: uploadId
},
include: {
vtubers: {
select: {
id: true,
displayName: true
}
}
}
});
return reply.view('uploads/show.hbs', {
CDN_ORIGIN: env.CDN_ORIGIN,
upload,
user,
site: constants.site,
}, { layout: 'layouts/main.hbs' });
});
fastify.get('/upload', async function (request, reply) {
return reply.redirect('/vods/new');
});
fastify.get('/vods/new', async function (request, reply) {
const userId = request.session.get('userId');
// Get the authenticated user
const user = await prisma.user.findFirstOrThrow({
where: { id: userId },
include: {
roles: true
}
});
if (isUnprivilegedUser(user)) {
return reply.status(403).send('Uploads are enabled for patrons only.');
}
const vtubers = await prisma.vtuber.findMany({
select: {
slug: true,
displayName: true,
id: true,
},
})
return reply.view('uploads/new.hbs', {
vtubers,
user,
site: constants.site,
}, { layout: 'layouts/main.hbs' });
})
fastify.post('/upload', async function (request, reply) {
const body = request.body as {
uppyResult?: string;
streamDate?: string;
notes?: string;
vtuberIds?: string[];
};
console.log(body)
console.log('uppyResult as follows')
console.log(body.uppyResult)
console.log(`Array.isArray(body.vtuberIds)=${Array.isArray(body.vtuberIds)}`)
const userId = request.session.get('userId');
const site = constants.site
// Get the authenticated user
const user = await prisma.user.findFirstOrThrow({
where: { id: userId },
include: {
roles: true
}
});
const vtubers = await prisma.vtuber.findMany({
select: {
slug: true,
displayName: true,
id: true,
},
})
if (!body.uppyResult) {
return reply.status(400).view('uploads/new.hbs', {
error: 'Missing uppyResult',
vtubers,
user,
site: constants.site,
}, { layout: 'layouts/main.hbs' });
}
if (!body.streamDate) {
return reply.status(400).view('uploads/new.hbs', {
error: 'Missing streamDate',
vtubers,
user,
site
}, { layout: 'layouts/main.hbs' });
}
const vtuberIds = [body.vtuberIds].flat()
if (vtuberIds.length === 0) {
return reply.status(400).view('uploads/new.hbs', {
message: '❌❌❌ vtuberIds were missing from the request. Please try again.',
vtubers,
user,
site
}, { layout: 'layouts/main.hbs' });
}
let data: UploadResult[];
try {
data = JSON.parse(body.uppyResult);
} catch {
return reply.status(400).send('Invalid JSON in uppyResult');
}
if (!userId) return reply.status(401).send('Failed to find userId in session. Please log-in and try again.');
// console.log('data as fllows')
// console.log(data)
if (isUnprivilegedUser(user)) {
return reply.status(403).send('Upload failed-- user is not a patron');
}
// A user can potentially upload more than once in the Uppy uploader (Uppy calls these batches).
// This is different from uploading more than one file.
// It's not likely to happen, but we need to handle the possibility.
// Also it's just how Uppy formats the data, so we have to handle the array of upload iterations.
// I *think* the correct behavior for us is to ignore all but the last batch.
let mostRecentUploadBatch = data.at(-1)
if (!mostRecentUploadBatch) throw new Error('mostRecentUploadBatch not found');
// console.log('mostRecentUploadBatch as follows')
// console.log(mostRecentUploadBatch)
if (mostRecentUploadBatch.failed.length > 0) {
const failedFiles = mostRecentUploadBatch.failed.map((file: UppyFile) => {
return {
name: file.name,
id: file.id,
error: file.error ?? 'Unknown error',
size: file.size,
type: file.type,
meta: file.meta,
};
});
const errorMessage = `One or more uploads failed:\n` + failedFiles
.map(
(file) =>
`${file.name} (${file.type}, ${file.size} bytes): ${file.error}`
)
.join('\n');
const error = new Error(errorMessage);
(error as any).failedFiles = failedFiles;
throw error;
}
const upload = await prisma.vod.create({
data: {
segmentKeys: mostRecentUploadBatch.successful.map((d) => ({ key: d.s3Multipart.key, name: d.name })),
streamDate: new Date(body.streamDate),
notes: body.notes,
status: 'ordering',
uploader: {
connect: {
id: userId
}
},
vtubers: {
connect: vtuberIds!.map((id) => ({ id })) // 👈 expects an array of vtuber IDs
}
}
})
// successful upload
request.flash('info', `✅ Successfully created upload <a href="/uploads/${upload.id}">${upload.id}</a>`)
return reply.redirect('/uploads')
})
// fastify.patch('/uploads/:uploadId', {
// schema: {
// params: {
// type: 'object',
// properties: {
// uploadId: { type: 'string' }
// },
// required: ['uploadId']
// },
// body: {
// type: 'object',
// properties: {
// status: { type: 'string', enum: ['approved', 'rejected', 'pending'] },
// segmentKeys: {
// type: 'string'
// }
// }
// }
// }
// }, async function (
// request: FastifyRequest<{
// Params: { uploadId: string };
// Body: {
// status?: 'approved' | 'rejected' | 'pending';
// segmentKeys?: string;
// };
// }>,
// reply: FastifyReply
// ) {
// const userId = request.session.get('userId');
// if (!userId) {
// return reply.status(401).send('Failed to find userId in session. Please log-in and try again.');
// }
// const user = await prisma.user.findFirstOrThrow({
// where: { id: userId },
// include: { roles: true }
// });
// const { uploadId } = request.params;
// const { status, segmentKeys } = request.body;
// let parsedSegmentKeys: VodSegment[] = [];
// if (segmentKeys) {
// try {
// parsedSegmentKeys = JSON.parse(segmentKeys);
// if (!Array.isArray(parsedSegmentKeys) || !parsedSegmentKeys.every(item => typeof item === 'object' && item.name && item.key)) {
// return reply.status(400).send('Invalid segmentKeys format');
// }
// } catch (error) {
// return reply.status(400).send('Invalid segmentKeys JSON');
// }
// }
// const upload = await prisma.vod.findFirstOrThrow({
// where: { id: uploadId },
// });
// // ⛔ Reject changes by users/supporters if status is not 'pending'
// if (!isModerator(user) && upload.status !== 'pending') {
// return reply.status(403).send('Changes to this Vod can no longer be made.');
// }
// // ⛔ Restrict status change to moderators/admins
// if (status && !isModerator(user)) {
// return reply.status(403).send('Only moderators can change the status.');
// }
// // @todo there is a possible access bypass here. Users are able to edit the segment.key
// // we need to discard any changes to the VodSegment[].key
// try {
// const updatedUpload = await prisma.vod.update({
// where: { id: uploadId },
// data: {
// ...(status && { status }),
// ...(segmentKeys && {
// segmentKeys: parsedSegmentKeys
// })
// }
// });
// return reply.code(200).view('uploads/show.hbs', {
// CDN_ORIGIN: env.CDN_ORIGIN,
// upload: updatedUpload,
// user,
// message: `Saved ${updatedUpload.updatedAt} ✅`,
// });
// } catch (err) {
// request.log.error(err);
// return reply.code(500).send({ error: 'Internal Server Error' });
// }
// });
// async function updateUploadStatus(
// uploadId: string,
// status: 'pending' | 'approved' | 'rejected',
// reason?: string
// ): Promise<Upload> {
// const upload = await prisma.vod.update({
// where: {
// id: uploadId
// },
// data: {
// status
// }
// })
// console.log(`Upload ${uploadId} status updated to ${status}`);
// return upload
// }
/**
* Validates that the newSegments only reorder the originalSegments, without modifying them.
* If valid, returns the normalized reordered array (same references as originalSegments).
* Throws an error if the user attempts to modify any segment content.
*/
function validateAndNormalizeSegmentKeys(
originalSegments: VodSegment[],
rawInput: string
): VodSegment[] {
let newSegments: VodSegment[];
try {
const parsed = JSON.parse(rawInput);
if (!Array.isArray(parsed)) throw new Error();
newSegments = parsed;
} catch {
throw new Error('Invalid JSON format for segmentKeys');
}
if (newSegments.length !== originalSegments.length) {
throw new Error('segmentKeys must not add or remove segments');
}
const isValid = newSegments.every(s =>
originalSegments.some(o => o.key === s.key && o.name === s.name)
);
if (!isValid) {
throw new Error('segmentKeys may only be reordered, not modified');
}
// Normalize: return reordered array using original references
return newSegments.map(s => {
const match = originalSegments.find(o => o.key === s.key && o.name === s.name);
if (!match) throw new Error('Unexpected error during normalization');
return match;
});
}
fastify.patch('/uploads/:uploadId', {
schema: {
params: {
type: 'object',
required: ['uploadId'],
properties: {
uploadId: { type: 'string' }
}
},
body: {
type: 'object',
properties: {
status: { type: 'string', enum: ['ordering', 'approved', 'rejected', 'pending'] },
segmentKeys: { type: 'string' }
}
}
}
}, async function (
request: FastifyRequest<{
Params: { uploadId: string };
Body: { status?: 'ordering' | 'approved' | 'rejected' | 'pending'; segmentKeys?: string };
}>,
reply: FastifyReply
) {
const { uploadId } = request.params;
const { status, segmentKeys } = request.body;
// 🔐 Session validation
const userId = request.session.get('userId');
if (!userId) {
return reply.status(401).send('Unauthorized: Missing session userId.');
}
// 👤 Fetch user and roles
let user;
try {
user = await prisma.user.findFirstOrThrow({
where: { id: userId },
include: { roles: true }
});
} catch (err) {
request.log.error(err);
return reply.status(500).send('Failed to fetch user.');
}
// 🎯 Fetch the target upload
let upload;
try {
upload = await prisma.vod.findFirstOrThrow({
where: { id: uploadId }
});
} catch (err) {
return reply.status(404).send('Upload not found.');
}
// 🛡️ Enforce permission rules
const userIsModerator = isModerator(user);
if (!userIsModerator && (upload.status !== 'pending' && upload.status !== 'ordering')) {
return reply.status(403).send('Only pending uploads can be modified.');
}
if ((status !== 'pending' && status !== 'ordering') && !userIsModerator) {
console.log(`status=${status} and !userIsModerator=${!userIsModerator}`)
return reply.status(403).send('Only moderators can update status.');
}
// 📦 Parse and sanitize segmentKeys
let parsedSegmentKeys: VodSegment[] = [];
if (segmentKeys) {
try {
const originalSegmentKeys = upload.segmentKeys as VodSegment[];
parsedSegmentKeys = validateAndNormalizeSegmentKeys(originalSegmentKeys, segmentKeys);
} catch (err) {
return reply.status(400).send(err.message);
}
}
// 💾 Update the upload
try {
const updatedUpload = await prisma.vod.update({
where: { id: uploadId },
data: {
...(status && { status }),
...(segmentKeys && { segmentKeys: parsedSegmentKeys })
}
});
return reply.code(200).view('uploads/show.hbs', {
CDN_ORIGIN: env.CDN_ORIGIN,
upload: updatedUpload,
user,
site: constants.site,
message: `Saved ${updatedUpload.updatedAt}`,
}, { layout: 'layouts/main.hbs' });
} catch (err) {
request.log.error(err);
return reply.code(500).send({ error: 'Internal Server Error' });
}
});
async function signOnServer(request: FastifyRequest, reply: FastifyReply) {
try {
// ensure user is authed.
const userId = request.session.get('userId')
const user = await prisma.user.findFirstOrThrow({
where: {
id: userId
}
})
const { filename, contentType } = extractFileParameters(request)
validateFileParameters(filename, contentType)
console.log(`User ${user.id} is uploading ${filename} (${contentType}).`);
const ext = mime.extension(contentType)
const Key = generateS3Key(ext)
const url = await getSignedUrl(
getS3Client(),
new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key,
ContentType: contentType,
}),
{ expiresIn }
)
reply
.header('Access-Control-Allow-Origin', accessControlAllowOrigin)
.send({
url,
method: 'PUT',
})
} catch (err) {
request.log.error(err)
reply.code(400).send({ error: err.message })
}
}
fastify.get('/s3/params', signOnServer)
fastify.post('/s3/sign', signOnServer)
fastify.post('/s3/multipart', async (request, reply) => {
const client = getS3Client()
const { type, metadata, filename } = request.body as MultipartBody
if (typeof filename !== 'string') {
return reply.code(400).send({ error: 's3: content filename must be a string' })
}
if (typeof type !== 'string') {
return reply.code(400).send({ error: 's3: content type must be a string' })
}
const ext = mime.extension(type)
const Key = generateS3Key(ext)
const command = new CreateMultipartUploadCommand({
Bucket: process.env.S3_BUCKET,
Key,
ContentType: type,
Metadata: metadata,
})
try {
const data = await client.send(command)
reply
.header('Access-Control-Allow-Origin', accessControlAllowOrigin)
.send({
key: data.Key,
uploadId: data.UploadId,
})
} catch (err) {
console.error(err)
request.log.error(err)
reply.code(500).send({ error: 'Failed to create multipart upload' })
}
})
fastify.get('/s3/multipart/:uploadId', async (request, reply) => {
const client = getS3Client()
const { uploadId } = request.params as { uploadId: string }
const { key } = request.query as { key?: string }
console.log(`s3 multipart with uploadId=${uploadId}, key=${key}`)
if (typeof key !== 'string') {
reply.status(400).send({
error:
's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
})
return
}
const parts: any[] = []
async function listPartsPage(startAt?: number): Promise<void> {
try {
const data = await client.send(
new ListPartsCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
UploadId: uploadId,
PartNumberMarker: startAt,
})
)
parts.push(...(data.Parts ?? []))
if (data.IsTruncated && data.NextPartNumberMarker) {
await listPartsPage(data.NextPartNumberMarker)
}
} catch (err) {
reply.send(err)
return
}
}
await listPartsPage()
reply.send(parts)
})
fastify.get('/s3/multipart/:uploadId/:partNumber', async (request, reply) => {
const { uploadId, partNumber } = request.params as {
uploadId: string
partNumber: string
}
const { key } = request.query as { key?: string }
if (!validatePartNumber(partNumber)) {
return reply.status(400).send({
error: 's3: the part number must be an integer between 1 and 10000.',
})
}
if (typeof key !== 'string') {
return reply.status(400).send({
error:
's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
})
}
try {
const url = await getSignedUrl(
getS3Client(),
new UploadPartCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
UploadId: uploadId,
PartNumber: Number(partNumber),
Body: '', // Body must be set, but actual upload happens client-side
}),
{ expiresIn }
)
reply.header('Access-Control-Allow-Origin', accessControlAllowOrigin)
return reply.send({ url, expires: expiresIn })
} catch (err) {
request.log.error(err)
return reply.status(500).send({ error: 'Failed to generate presigned URL.' })
}
})
fastify.post('/s3/multipart/:uploadId/complete', async (request, reply) => {
const userId = request.session.get('userId')
console.log(`userId=${userId}`)
if (!userId) return reply.status(401).send('User id not found in session. Please log-in.');
const client = getS3Client()
const { uploadId } = request.params as { uploadId: string }
const { key } = request.query as { key?: string }
const { parts } = request.body as { parts?: Array<{ ETag: string; PartNumber: number }> }
if (typeof key !== 'string') {
reply.status(400).send({
error: 's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
})
return
}
const isValidPart = (part: any): part is { ETag: string; PartNumber: number } =>
part &&
typeof part === 'object' &&
typeof part.ETag === 'string' &&
typeof part.PartNumber === 'number'
if (!Array.isArray(parts) || !parts.every(isValidPart)) {
reply.status(400).send({
error: 's3: `parts` must be an array of {ETag, PartNumber} objects.',
})
return
}
try {
const data = await client.send(
new CompleteMultipartUploadCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
UploadId: uploadId,
MultipartUpload: { Parts: parts },
})
)
reply.header('Access-Control-Allow-Origin', accessControlAllowOrigin).send({
location: data.Location,
})
} catch (err) {
console.error('there was an error during CompleteMultipartUploadCommand.')
console.error(err)
reply.send(err)
}
})
fastify.delete('/s3/multipart/:uploadId', async (request, reply) => {
const client = getS3Client()
const { uploadId } = request.params as { uploadId: string }
const { key } = request.query as { key?: string }
if (typeof key !== 'string') {
reply.status(400).send({
error: 's3: the object key must be passed as a query parameter. For example: "?key=abc.jpg"',
})
return
}
try {
await client.send(
new AbortMultipartUploadCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
UploadId: uploadId,
})
)
reply.send({})
} catch (err) {
reply.send(err)
}
})
}