This commit is contained in:
Chris Grimmett 2023-09-26 03:29:44 -08:00
parent b84e4d50c0
commit 4341e991f1
50 changed files with 4248 additions and 61 deletions

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
.env*
venv*
tunnel.conf
.pnp.*

View File

@ -2,18 +2,11 @@
Quality Assurance
Home of Futureporn.net Discord Futurebutt [bot]
## Features
* [x] When a record in the DB is updated, run tasks.
* [x] Triggered by Strapi create, update, and publish webhooks.
* [x] Idempotent operation
* [x] Posts messages in Discord
A long-running tasks
## Issue tracker
Please use https://github.com/insanity54/futureporn/issues
Please use https://gitea.futureporn.net/futureporn/futureporn-qa/issues
## dev notes

71
cli.ts Executable file
View File

@ -0,0 +1,71 @@
import * as dotenv from 'dotenv'
import PgBoss from 'pg-boss';
import {identifyVodIssues} from './tasks/identifyVodIssues.js';
import {generateThumbnail} from './tasks/generateThumbnail.js';
import {deleteThumbnail} from './tasks/deleteThumbnail.js';
import {createMuxAsset} from './tasks/createMuxAsset.js';
import yargs from 'yargs/yargs';
import { hideBin } from 'yargs/helpers';
export interface IJobData {
id: number;
env: NodeJS.ProcessEnv;
}
dotenv.config()
if (!process.env.PG_DSN) throw new Error('PG_DSN is missing in env')
const PG_DSN = process.env.PG_DSN
yargs(hideBin(process.argv))
.command('verify <id>', 'find and fix issues with a specific vod', () => {
console.log('builder')
}, async (argv: { id: number }) => {
const boss = new PgBoss(PG_DSN)
await boss.start();
console.log(`verifying with id:${argv.id}. PG_DSN:${process.env.PG_DSN}`)
await boss.send('identifyVodIssues', { id: argv.id }, { priority: 100 })
const jobs = await boss.fetch('identifyVodIssues', 1) as PgBoss.Job[]
console.log(jobs)
const job = jobs[0]
if (!job) {
console.log('there is no identifyVodIssues job available')
return
}
try {
await identifyVodIssues(job, boss);
boss.complete(job.id);
const createJobs = await boss.fetch('createMuxAsset', 1) as PgBoss.Job[]
console.log(createJobs)
const createJob = createJobs[0];
if (!createJob) {
console.log(`there is no createMuxAsset job available`)
return
}
try {
console.log('create')
await createMuxAsset(createJob, process.env);
boss.complete(createJob.id)
} catch (e) {
boss.fail(createJob.id, e)
}
} catch (e) {
boss.fail(job.id, e);
}
// console.log('sending job')
})
.demandCommand(1)
.help()
.parse()

20
environment.d.ts vendored Normal file
View File

@ -0,0 +1,20 @@
declare global {
namespace NodeJS {
interface ProcessEnv {
TMPDIR: string;
B2_BUCKET: string;
B2_ENDPOINT: string;
B2_KEY: string;
B2_REGION: string;
B2_SECRET: string;
STRAPI_API_KEY: string;
MUX_TOKEN_SECRET: string;
MUX_TOKEN_ID: string;
}
}
}
// If this file has no import/export statements (i.e. is a script)
// convert it into a module by adding an empty export statement.
export {}

View File

@ -16,8 +16,6 @@ dotenv.config()
import { loggerFactory } from "./lib/logger.js"
import Fastify from 'fastify'
import Cluster from './lib/Cluster.js'
import { join } from 'node:path';
import { homedir } from 'os';
import S from 'fluent-json-schema'
import fs from 'node:fs';
import fastq from 'fastq'
@ -95,7 +93,7 @@ const webhookBodyJsonSchema = S.object()
.valueOf()
const schema = {
const webhookSchema = {
body: webhookBodyJsonSchema,
}
@ -131,7 +129,7 @@ async function tasks (body, cb) {
// This is used by Strapi.
// When a new VOD is created, Strapi GETs the route
// QA responds by adding the IPFS hash
fastify.post('/webhook', { schema }, async (request, reply) => {
fastify.post('/webhook', { schema: webhookSchema }, async (request, reply) => {
logger.log({ level: 'info', message: `Webhook was hit!` })
reply.type('application/json')
@ -153,6 +151,19 @@ fastify.post('/webhook', { schema }, async (request, reply) => {
}
})
fastify.post('/thumbnails', { schema: thumbnailsSchema }, async (request, reply) => {
logger.log({ level: 'info', message: `Webhook was hit!` })
reply.type('application/json')
const body = request?.body
if (body === undefined) {
reply.code(400)
return {
message: 'body must be defined, but it was undefined'
}
}
logger.log({ level: 'info', message: })
})
// Run the server!

213
index.ts Normal file
View File

@ -0,0 +1,213 @@
/**
Quality Assurance
* [x] serve a webhook
* [x] when webhook is hit, run a series of tasks
* [x] `ipfs pin add` the
* [x] videoSrcHash
* [x] video240Hash
* [x] thiccHash
*/
import * as dotenv from 'dotenv'
dotenv.config()
import { loggerFactory } from "./src/logger.js"
import fastify from 'fastify'
// import Cluster from './src/Cluster'
import S from 'fluent-json-schema'
import fs from 'node:fs';
import fastq from 'fastq'
import { taskAllocateMux } from './src/taskAllocateMux.js'
import { taskPinIpfsContent } from './src/taskPinIpfsContent.js'
import taskContinueOnlyIfPublished from './src/taskContinueOnlyIfPublished.js'
import taskAssertThumbnail from './src/taskAssertThumbnail.js'
import taskDownloadVideoSrcB2 from './src/taskDownloadVideoSrcB2.js'
import taskAddVideoSrcHash from './src/taskAddVideoSrcHash.js'
import taskAddVideo240Hash from './src/taskAddVideo240Hash.js'
import taskTriggerWebsiteBuild from './src/taskTriggerWebsiteBuild.js'
import taskAssertFfmpeg from './src/taskAssertFfmpeg.js'
interface IWebhookBody {
event: 'string';
model: 'string';
uid: 'string';
entry: {
id: number;
videoSrcHash: string;
video240Hash: string;
}
}
// {
// event: 'entry.update',
// createdAt: '2023-05-08T09:58:12.118Z',
// model: 'vod',
// uid: 'api::vod.vod',
// entry: {
// id: 29,
// videoSrcHash: null,
// video720Hash: null,
// video480Hash: null,
// video360Hash: null,
// video240Hash: null,
// thinHash: null,
// thiccHash: null,
// announceTitle: null,
// announceUrl: null,
// note: 'asdfsdkljkksdsdd',
// date: '2024-04-22T08:00:00.000Z',
// spoilers: null,
// title: null,
// createdAt: '2023-05-08T09:48:48.052Z',
// updatedAt: '2023-05-08T09:58:12.102Z',
// publishedAt: '2023-05-08T09:51:39.409Z',
// tags: { count: 0 },
// uploader: { count: 1 },
// backup: { count: 1 },
// muxAsset: { count: 1 }
// }
// }
const version = JSON.parse(fs.readFileSync('./package.json', { encoding: 'utf-8' })).version
const appEnv: string[] = new Array(
'PORT',
'NODE_ENV',
'IPFS_CLUSTER_HTTP_API_MULTIADDR',
'IPFS_CLUSTER_HTTP_API_USERNAME',
'IPFS_CLUSTER_HTTP_API_PASSWORD',
'MUX_TOKEN_ID',
'MUX_TOKEN_SECRET',
'STRAPI_URL',
'STRAPI_API_KEY',
'B2_ENDPOINT',
'B2_BUCKET',
'B2_SECRET',
'B2_KEY',
'B2_REGION',
'TMPDIR',
'FUTUREPORN_WORKDIR',
)
const logger = loggerFactory({
service: 'futureporn/qa'
})
const workQueue = fastq(tasks, 1)
const server = fastify({
logger: false,
})
// const cluster = new Cluster({
// uri: 'https://cluster.sbtp.xyz:9094',
// username: process.env.IPFS_CLUSTER_HTTP_API_USERNAME,
// password: process.env.IPFS_CLUSTER_HTTP_API_PASSWORD
// })
const appContext = {
env: appEnv.reduce((acc, ev) => {
const envValue = process.env[ev];
if (typeof envValue === 'undefined') {
throw new Error(`${ev} is undefined in env`);
}
acc[ev] = envValue;
return acc;
}, {} as Record<string, string>),
logger,
// cluster,
changed: false,
build: null,
};
const webhookBodyJsonSchema = S.object()
.prop('event', S.string()).required()
.prop('model', S.string())
.prop('entry', S.object()).required()
.valueOf()
const schema = {
body: webhookBodyJsonSchema,
}
/**
* WARNING-- All tasks must be idempotent in order to not cause an endless loop
* If a task makes a change that should trigger a website build,
* that task must set appContext.changed to true
*/
async function tasks (body: IWebhookBody, cb: Function) {
try {
await taskContinueOnlyIfPublished(appContext, body)
await taskAssertFfmpeg(appContext)
await taskAllocateMux(appContext)
await taskDownloadVideoSrcB2(appContext, body)
await taskAssertThumbnail(appContext, body)
await taskPinIpfsContent(appContext, body)
await taskAddVideoSrcHash(appContext, body)
await taskAddVideo240Hash(appContext, body)
await taskTriggerWebsiteBuild(appContext)
} catch (err: unknown) {
logger.log({ level: 'error', message: 'Error while running QA tasks' })
console.error(err)
// logger.log({ level: 'error', message: err.message })
// if (err instanceof ValidationError) {
// // Inside this block, err is known to be a ValidationError
// }
}
logger.log({ level: 'info', message: 'Tasks complete'})
cb(null, null)
}
// Declare a webhook route
// This is used by Strapi.
// When a new VOD is created, Strapi G the route
// QA responds by adding the IPFS hash
server.post('/webhook', { schema }, async (request, reply) => {
logger.log({ level: 'info', message: `Webhook was hit!` })
reply.type('application/json')
// we can use the `request.body` object to get the data sent by the client
// const result = await collection.insertOne({ animal: request.body.animal })
// return result
const body: IWebhookBody | undefined = request?.body as IWebhookBody;
if (body === undefined) {
reply.code(400)
return {
message: 'body must be defined, but it was undefined'
}
}
logger.log({ level: 'info', message: 'Queuing tasks' })
workQueue.push(body)
return {
message: 'ok'
}
})
// Run the server!
server.listen({
port: Number(process.env.PORT) || 5000, // Cast to number
host: '0.0.0.0', // Replace with your desired host
}).then((address) => {
logger.log({ level: 'info', message: `QA server ${version} in NODE_ENV ${appContext.env.NODE_ENV} listening on ${address}` });
}).catch((err) => {
console.error(err);
});

57
lib/b2.ts Normal file
View File

@ -0,0 +1,57 @@
import { GetObjectCommand, PutObjectCommand, S3Client } from '@aws-sdk/client-s3';
import cuid from 'cuid';
import path from 'node:path';
import fs from 'node:fs';
import { getVideoSrcB2LocalFilePath } from './fsCommon.js'
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
const urlPrefix = 'https://f000.backblazeb2.com/b2api/v1/b2_download_file_by_id?fileId='
// export async function downloadVideoSrcB2 (appContext, vod) {
// const localFilePath = getVideoSrcB2LocalFilePath(appContext, vod);
// const key = vod.attributes.videoSrcB2.data.attributes.key
// const s3 = new S3Client({
// endpoint: appContext.env.B2_ENDPOINT,
// region: appContext.env.B2_REGION,
// credentials: {
// accessKeyId: process.env.B2_KEY,
// secretAccessKey: process.env.B2_SECRET,
// },
// });
// var params = {Bucket: appContext.env.B2_BUCKET, Key: key};
// const s3Result = await s3.send(new GetObjectCommand(params));
// if (!s3Result.Body) {
// throw new Error('received empty body from S3');
// }
// await pipeline(s3Result.Body, fs.createWriteStream(localFilePath));
// return localFilePath
// }
export async function uploadToB2 (env: NodeJS.ProcessEnv, filePath: string) {
const keyName = `${cuid()}-${path.basename(filePath)}`
const bucketName = env.B2_BUCKET
console.log(`uploadToB2 begin. bucketName:${bucketName}`)
const s3 = new S3Client({
endpoint: env.B2_ENDPOINT,
region: env.B2_REGION,
credentials: {
accessKeyId: env.B2_KEY,
secretAccessKey: env.B2_SECRET,
}
});
var params = {Bucket: bucketName, Key: keyName, Body: fs.createReadStream(filePath)};
const res = await s3.send(new PutObjectCommand(params));
if (!res.VersionId) {
const msg = 'res was missing VersionId'
throw new Error(msg)
}
return {
uploadId: res.VersionId,
key: keyName,
url: `${urlPrefix}${res.VersionId}`
}
}

20
lib/b2File.ts Normal file
View File

@ -0,0 +1,20 @@
export interface IB2File {
id: number;
url: string;
key: string;
uploadId: string;
cdnUrl: string;
}
export function unmarshallB2File(d: any): IB2File | null {
if (!d) return null;
return {
id: d.id,
url: d.attributes.url,
key: d.attributes.key,
uploadId: d.attributes.uploadId,
cdnUrl: d.attributes?.cdnUrl
}
}

10
lib/boss.ts Normal file
View File

@ -0,0 +1,10 @@
import PgBoss from 'pg-boss';
const boss = function(DSN) {
const b = new PgBoss(DSN);
return b;
}
export default boss;

2
lib/constants.ts Normal file
View File

@ -0,0 +1,2 @@
export const ipfsHashRegex = /Qm[1-9A-HJ-NP-Za-km-z]{44,}|b[A-Za-z2-7]{58,}|B[A-Z2-7]{58,}|z[1-9A-HJ-NP-Za-km-z]{48,}|F[0-9A-F]{50,}/;
export const strapiUrl = (process.env.NODE_ENV === 'production') ? 'https://portal.futureporn.net' : 'http://localhost:1337'

18
lib/dates.ts Normal file
View File

@ -0,0 +1,18 @@
import { format, formatInTimeZone, utcToZonedTime, toDate, zonedTimeToUtc } from 'date-fns-tz';
import { parse } from 'date-fns';
const safeDateFormatString: string = "yyyyMMdd'T'HHmmss'Z'"
const localTimeZone = Intl.DateTimeFormat().resolvedOptions().timeZone;
export function getSafeDate (text: string): string {
const date = utcToZonedTime(text, 'UTC');
return format(date, safeDateFormatString, { timeZone: 'UTC' });
}
export function getDateFromSafeDate(safeDate: string): Date {
const date = parse(safeDate, safeDateFormatString, new Date())
const utcDate = zonedTimeToUtc(date, 'UTC')
return utcDate;
}

10
lib/fsCommon.ts Normal file
View File

@ -0,0 +1,10 @@
import path from 'node:path';
import fs from 'node:fs';
import { IVod } from './vods.js'
export function getVideoSrcB2LocalFilePath (env: NodeJS.ProcessEnv, vod: IVod) {
if (!vod?.videoSrcB2?.key) throw new Error(`vod is missing videoSrcB2.key which is required to download`)
const key = vod.videoSrcB2.key
const localFilePath = path.join(env.TMPDIR, key)
return localFilePath
}

6
lib/random.ts Normal file
View File

@ -0,0 +1,6 @@
export function getRandomInt(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
return Math.floor(Math.random() * (max - min + 1)) + min;
}

98
lib/tag-vod-relations.ts Normal file
View File

@ -0,0 +1,98 @@
import qs from 'qs';
import { strapiUrl } from './constants.js'
import { unmarshallTag, ITag, IToyTag } from './tags.js';
import { IVod } from './vods.js';
export interface ITagVodRelation {
id: number;
tag: ITag | IToyTag;
vod: IVod;
}
export interface ITagVodRelations {
data: ITagVodRelation[];
pagination: {
page: number;
pageSize: number;
pageCount: number;
total: number;
}
}
export function unmarshallTagVodRelation(d: any): ITagVodRelation {
return {
id: d.id,
tag: unmarshallTag(d.attributes.tag.data),
vod: d.attributes.vod
}
}
export async function getTagVodRelationsForVtuber(vtuberId: number, page: number = 1, pageSize: number = 25): Promise<ITagVodRelations> {
// get the tag-vod-relations where the vtuber is the vtuber we are interested in.
const query = qs.stringify(
{
populate: {
tag: {
fields: ['id', 'name'],
populate: {
toy: {
fields: ['linkTag', 'make', 'model', 'image2'],
populate: {
linkTag: {
fields: ['name']
}
}
}
}
},
vod: {
populate: {
vtuber: {
fields: ['slug']
}
}
}
},
filters: {
vod: {
vtuber: {
id: {
$eq: vtuberId
}
}
},
tag: {
toy: {
linkTag: {
name: {
$notNull: true
}
}
}
}
},
pagination: {
page: page,
pageSize: pageSize
},
sort: {
id: 'desc'
}
}
)
// we need to return an IToys object
// to get an IToys object, we have to get a list of toys from tvrs.
return fetch(`${strapiUrl}/api/tag-vod-relations?${query}`)
.then((res) => res.json())
.then((j) => {
const tvrs = {
data: j.data.map(unmarshallTagVodRelation),
pagination: j.meta.pagination
}
return tvrs
})
}

71
lib/tags.ts Normal file
View File

@ -0,0 +1,71 @@
import { strapiUrl } from './constants.js'
import { IVod } from './vods.js';
import { unmarshallToy, IToy } from './toys.js';
export interface ITag {
id: number;
name: string;
count: number;
}
export interface IToyTag extends ITag {
toy: IToy;
}
export function unmarshallTag(d: any): ITag | IToyTag {
const tag: ITag = {
id: d.id,
name: d.attributes.name,
count: 0 // count gets updated later
};
if (d.attributes.toy && d.attributes.toy.data) {
const toy: IToy = unmarshallToy(d.attributes.toy.data);
const toyTag: IToyTag = { ...tag, toy };
return toyTag;
}
return tag;
}
// export async function getTags(): Promise<ITag[]> {
// const tagVodRelations = await fetchPaginatedData('/api/tag-vod-relations', 100, { 'populate[0]': 'tag', 'populate[1]': 'vod' });
// // Create a Map to store tag data, including counts and IDs
// const tagDataMap = new Map<string, { id: number, count: number }>();
// // Populate the tag data map with counts and IDs
// tagVodRelations.forEach(tvr => {
// const tagName = tvr.attributes.tag.data.attributes.name;
// const tagId = tvr.attributes.tag.data.id;
// if (!tagDataMap.has(tagName)) {
// tagDataMap.set(tagName, { id: tagId, count: 1 });
// } else {
// const existingData = tagDataMap.get(tagName);
// if (existingData) {
// tagDataMap.set(tagName, { id: existingData.id, count: existingData.count + 1 });
// }
// }
// });
// // Create an array of Tag objects with id, name, and count
// const tags: ITag[] = Array.from(tagDataMap.keys()).map(tagName => {
// const tagData = tagDataMap.get(tagName);
// return {
// id: tagData ? tagData.id : -1,
// name: tagName,
// count: tagData ? tagData.count : 0,
// };
// });
// return tags;
// }

70
lib/toys.ts Normal file
View File

@ -0,0 +1,70 @@
import { ITag } from './tags.js'
export interface IToys {
data: IToy[];
pagination: {
page: number;
pageSize: number;
pageCount: number;
total: number;
}
}
export interface IToy {
id: number;
tags: ITag[];
linkTag: ITag;
make: string;
model: string;
aspectRatio: string;
image2: string;
}
interface IToysListProps {
toys: IToys;
page: number;
pageSize: number;
}
/** This endpoint doesn't exist at the moment, but definitely could in the future */
// export function getUrl(toy: IToy): string {
// return `${siteUrl}/toy/${toy.name}`
// }
// export function getToysForVtuber(vtuberId: number, page: number = 1, pageSize: number = 25): Promise<IToys> {
// const tvrs = await getTagVodRelationsForVtuber(vtuberId, page, pageNumber);
// return {
// data: tvrs.data.
// pagination: tvrs.pagination
// }
// }
export function unarshallLinkTag(d: any): ITag {
return {
id: d.id,
name: d.attributes?.name,
count: 0
}
}
export function unmarshallToy(d: any): IToy {
const toy = {
id: d.id,
// tags: d.attributes.tags.data.map(unmarshallTag),
// linkTag: unmarshallTag(d.attributes.linkTag)
tags: [],
make: d.attributes.make,
model: d.attributes.model,
aspectRatio: d.attributes.aspectRatio,
image2: d.attributes.image2,
linkTag: unarshallLinkTag(d.attributes?.linkTag?.data),
}
return toy
}

257
lib/vods.ts Normal file
View File

@ -0,0 +1,257 @@
import { strapiUrl } from './constants.js'
import { getDateFromSafeDate, getSafeDate } from './dates.js'
import { unmarshallVtuber, IVtuber } from './vtubers.js'
import qs from 'qs'
import { ITagVodRelation, unmarshallTagVodRelation } from './tag-vod-relations.js'
import { IB2File, unmarshallB2File } from './b2File.js'
import fetch from 'node-fetch'
export interface IMuxAsset {
playbackId: string;
assetId: string;
}
export interface IVods {
data: IVod[];
pagination: {
page: number;
pageSize: number;
pageCount: number;
total: number;
}
}
export interface IMarshalledVod {
data: {
attributes: {
pledge_sum: number;
patron_count: number;
}
}
meta: {
pagination: {
total: number;
}
}
}
export interface IVod {
id: number;
title?: string;
date: string;
date2: string;
muxAsset: IMuxAsset;
thumbnail: IB2File | null;
vtuber: IVtuber;
tagVodRelations: ITagVodRelation[];
video240Hash: string;
videoSrcHash: string;
videoSrcB2: IB2File | null;
announceTitle: string;
announceUrl: string;
note: string;
}
export function unmarshallVod(d: any): IVod {
console.log('unmarshall following ')
console.log(d)
if (!d?.attributes) {
throw new Error(`panick! vod doesnt have an attributes`)
}
if (!d.attributes?.vtuber?.data) {
throw new Error("panick! vod data doesn't contain vtuber. please populate.")
}
const vod = {
id: d.id,
title: d.attributes.title,
date: d.attributes.date,
date2: d.attributes.date2,
muxAsset: {
playbackId: d.attributes?.muxAsset?.data?.attributes?.playbackId,
assetId: d.attributes?.muxAsset?.data?.attributes?.assetId,
},
thumbnail: unmarshallB2File(d.attributes?.thumbnail?.data),
vtuber: unmarshallVtuber(d.attributes?.vtuber?.data),
tagVodRelations: d.attributes?.tagVodRelations?.data.map(unmarshallTagVodRelation),
video240Hash: d.attributes?.video240Hash,
videoSrcHash: d.attributes?.videoSrcHash,
videoSrcB2: unmarshallB2File(d.attributes?.videoSrcB2?.data),
announceTitle: d.attributes.announceTitle,
announceUrl: d.attributes.announceUrl,
note: d.attributes.note,
}
return vod
}
export async function getVodForDate(date: Date): Promise<IVod> {
const iso8601DateString = date.toISOString().split('T')[0];
const query = qs.stringify(
{
filters: {
date: {
$eq: date.toISOString()
}
},
populate: {
vtuber: {
fields: ['slug', 'displayName', 'image', 'imageBlur', 'themeColor']
},
muxAsset: {
fields: ['playbackId', 'assetId']
},
thumbnail: {
fields: ['cdnUrl', 'url']
},
tagVodRelations: {
fields: ['tag'],
populate: ['tag']
},
videoSrcB2: {
fields: ['url', 'key', 'uploadId', 'cdnUrl']
}
},
// populate: '*'
}
)
return fetch(`${strapiUrl}/api/vods?${query}`)
.then((res) => res.json())
.then((d: any) => d.data[0])
.then(unmarshallVod)
}
export async function getRandomVod(): Promise<IVod | null> {
try {
console.log(`>>>> getRandomVod ${strapiUrl}/api/vod/random`)
const d = await fetch(`${strapiUrl}/api/vod/random`)
.then((res) => res.json()) as IVod;
return d;
} catch (e) {
console.error(`There was an error while fetching random vod`);
console.error(e);
return null;
}
}
export async function getVod(id: number): Promise<IVod | null> {
console.log(`we are now making a network fetch for vod ${id} (${typeof id})`)
const query = qs.stringify(
{
filters: {
id: {
$eq: id
}
},
populate: {
vtuber: {
fields: ['slug', 'displayName', 'image', 'imageBlur']
},
muxAsset: {
fields: ['playbackId', 'assetId']
},
thumbnail: {
fields: ['cdnUrl', 'url']
},
tagVodRelations: {
fields: ['tag'],
populate: ['tag']
},
videoSrcB2: {
fields: ['url', 'key', 'uploadId', 'cdnUrl']
}
}
}
)
try {
const d = await fetch(`${strapiUrl}/api/vods?${query}`)
.then((res) => res.json())
.then((data) => {
console.log('here is teh data')
console.log(data)
return data
})
.then((data: any) => unmarshallVod(data.data[0]))
return d
} catch (e) {
console.error(`there was an error while fetching vod ${id}`)
console.error(e)
return null
}
}
export async function getVods(page: number = 1, pageSize: number = 25, sortDesc = true): Promise<IVods> {
const query = qs.stringify(
{
populate: {
vtuber: {
fields: ['slug', 'displayName', 'image', 'imageBlur']
},
muxAsset: {
fields: ['playbackId', 'assetId']
},
thumbnail: {
fields: ['cdnUrl', 'url']
},
tagVodRelations: {
fields: ['tag'],
populate: ['tag']
},
videoSrcB2: {
fields: ['url', 'key', 'uploadId', 'cdnUrl']
}
},
sort: {
date: (sortDesc) ? 'desc' : 'asc'
},
pagination: {
pageSize: pageSize,
page: page
}
}
)
return fetch(`${strapiUrl}/api/vods?${query}`)
.then((res) => res.json())
.then((j: any) => (
{
data: j.data.map(unmarshallVod),
pagination: j.meta.pagination
}
))
}
/**
* This returns stale data, because futureporn-historian is broken.
* @todo get live data from historian
* @see https://git.futureporn.net/futureporn/futureporn-historian/issues/1
*/
export async function getProgress(vtuberSlug: string): Promise<{ complete: number; total: number }> {
const query = qs.stringify({
filters: {
vtuber: {
slug: {
$eq: vtuberSlug
}
}
}
})
const data = await fetch(`${strapiUrl}/api/vods?${query}`)
.then((res) => res.json() as any)
const total: number = data.meta.pagination.total
return {
complete: total,
total: total
}
}

113
lib/vtubers.ts Normal file
View File

@ -0,0 +1,113 @@
import { IVod } from './vods.js'
import { strapiUrl } from './constants.js';
import { getSafeDate } from './dates.js';
import qs from 'qs';
export interface IVtuber {
id: number;
slug: string;
displayName: string;
chaturbate?: string;
twitter?: string;
patreon?: string;
twitch?: string;
tiktok?: string;
onlyfans?: string;
youtube?: string;
linktree?: string;
carrd?: string;
fansly?: string;
pornhub?: string;
discord?: string;
reddit?: string;
throne?: string;
instagram?: string;
facebook?: string;
merch?: string;
vods: IVod[];
description1: string;
description2?: string;
image: string;
imageBlur?: string;
themeColor: string;
}
export function unmarshallVtuber(d: any): IVtuber {
if (!d) {
console.error('panick! unmarshallVTuber was called with undefined data')
console.trace()
}
return {
id: d.id,
slug: d.attributes?.slug,
displayName: d.attributes.displayName,
chaturbate: d.attributes?.chaturbate,
twitter: d.attributes?.twitter,
patreon: d.attributes?.patreon,
twitch: d.attributes?.twitch,
tiktok: d.attributes?.tiktok,
onlyfans: d.attributes?.onlyfans,
youtube: d.attributes?.youtube,
linktree: d.attributes?.linktree,
carrd: d.attributes?.carrd,
fansly: d.attributes?.fansly,
pornhub: d.attributes?.pornhub,
discord: d.attributes?.discord,
reddit: d.attributes?.reddit,
throne: d.attributes?.throne,
instagram: d.attributes?.instagram,
facebook: d.attributes?.facebook,
merch: d.attributes?.merch,
description1: d.attributes.description1,
description2: d.attributes?.description2,
image: d.attributes.image,
imageBlur: d.attributes?.imageBlur,
themeColor: d.attributes.themeColor,
vods: d.attributes.vods
}
}
export async function getVtuberBySlug(slug: string): Promise<IVtuber> {
const query = qs.stringify(
{
filters: {
slug: {
$eq: slug
}
},
// populate: {
// vods: {
// fields: ['id', 'videoSrcHash'],
// populate: ['vtuber']
// }
// }
}
)
return fetch(`${strapiUrl}/api/vtubers?${query}`)
.then((res) => res.json())
.then((d) => {
const vtuberData = d.data[0]
return unmarshallVtuber(vtuberData)
})
}
export async function getVtuberById(id: number): Promise<IVtuber> {
return fetch(`${strapiUrl}/api/vtubers?filters[id][$eq]=${id}`)
.then((res) => res.json())
.then((data) => {
return unmarshallVtuber(data.data[0])
})
}
export async function getVtubers(): Promise<IVtuber[]> {
return fetch(`${strapiUrl}/api/vtubers`)
.then((res) => res.json())
.then((data) => {
return data.data.map((d: IVtuber) => unmarshallVtuber(d))
})
}

30
manager.ts Normal file
View File

@ -0,0 +1,30 @@
import * as dotenv from 'dotenv'
import PgBoss from 'pg-boss';
dotenv.config()
if (!process.env.PG_DSN) throw new Error('PG_DSN is missing in env')
const PG_DSN = process.env.PG_DSN
async function main () {
const boss = new PgBoss({
connectionString: PG_DSN,
monitorStateIntervalSeconds: 5
});
const s = await boss.start();
s.on('monitor-states', (states) => {
const printQueueStats = (queueName, queue) => {
console.log(` ${queueName} created:${queue.created}, retry:${queue.retry}, active:${queue.active}, completed:${queue.completed}, failed:${queue.failed}`);
};
const { queues, active, created, completed, failed } = states;
console.log(`Jobs active:${active}, created:${created}, completed:${completed}, failed:${failed}`);
printQueueStats('generateThumbnail', queues.generateThumbnail);
printQueueStats('createMuxAsset', queues.createMuxAsset);
});
await boss.send('identifyVodIssues', { priority: 10 });
await boss.schedule('identifyVodIssues', '* * * * *', { priority: 10 }); // every minute
}
main()

View File

@ -6,8 +6,15 @@
"private": true,
"dependencies": {
"@11ty/eleventy-fetch": "^4.0.0",
"@apollo/client": "^3.8.4",
"@aws-sdk/client-s3": "^3.328.0",
"@prisma/client": "^5.3.1",
"@taskr/esnext": "^1.1.0",
"@types/node": "^20.6.5",
"apollo-client": "^2.6.10",
"bullmq": "^4.11.4",
"cuid": "^3.0.0",
"date-fns": "^2.30.0",
"discord.js": "^14.9.0",
"dotenv": "^16.0.3",
"execa": "^7.1.1",
@ -16,18 +23,27 @@
"ffbinaries": "^1.1.5",
"fluent-json-schema": "^4.1.0",
"formdata-node": "^5.0.0",
"foy": "^0.2.20",
"got": "^12.6.0",
"graphql-request": "^6.0.0",
"lodash-es": "^4.17.21",
"prevvy": "^5.0.1",
"winston": "^3.8.2"
"node-fetch": "^3.3.2",
"pg-boss": "^9.0.3",
"prevvy": "^6.0.0",
"qs": "^6.11.2",
"ts-node": "^10.9.1",
"ts-node-esm": "^0.0.6",
"winston": "^3.8.2",
"yargs": "^17.7.2"
},
"type": "module",
"scripts": {
"start": "node index",
"test": "mocha",
"tunnel": "wg-quick up ./tunnel.conf; echo 'press enter to close tunnel'; read _; wg-quick down ./tunnel.conf",
"dev": "nodemon --watch lib --watch .env --watch package.json --watch index.js index.js"
"dev:manager": "nodemon --watch manager.ts --watch .env --watch package.json --watch tasks --ext ts,json,js --exec \"node --inspect --loader ts-node/esm ./manager.ts\"",
"dev:worker": "nodemon --watch worker.ts --watch .env --watch package.json --watch tasks --ext ts,json,js --exec \"node --inspect --loader ts-node/esm ./worker.ts\"",
"dev:old": "nodemon --watch lib --watch .env --watch package.json --watch index.js index.js"
},
"devDependencies": {
"chai": "^4.3.7",

File diff suppressed because it is too large Load Diff

257
src/Cluster.ts Normal file
View File

@ -0,0 +1,257 @@
import dotenv from 'dotenv'
dotenv.config()
import got from 'got'
import https from 'https'
import path from 'node:path'
import { FormData } from 'formdata-node'
import fs, { createWriteStream } from 'node:fs'
import { pipeline } from 'node:stream'
import { promisify } from 'node:util'
import { fileFromPath } from "formdata-node/file-from-path"
import { loggerFactory } from './logger.js'
// const ipfsClusterExecutable = '/usr/local/bin/ipfs-cluster-ctl'
// const ipfsClusterUri = 'https://cluster.sbtp.xyz:9094'
const IPFS_CLUSTER_HTTP_API_USERNAME = process.env.IPFS_CLUSTER_HTTP_API_USERNAME;
const IPFS_CLUSTER_HTTP_API_PASSWORD = process.env.IPFS_CLUSTER_HTTP_API_PASSWORD;
const IPFS_CLUSTER_HTTP_API_MULTIADDR = process.env.IPFS_CLUSTER_HTTP_API_MULTIADDR;
if (typeof IPFS_CLUSTER_HTTP_API_USERNAME === 'undefined') throw new Error('IPFS_CLUSTER_HTTP_API_USERNAME in env is undefined');
if (typeof IPFS_CLUSTER_HTTP_API_PASSWORD === 'undefined') throw new Error('IPFS_CLUSTER_HTTP_API_PASSWORD in env is undefined');
if (typeof IPFS_CLUSTER_HTTP_API_MULTIADDR === 'undefined') throw new Error('IPFS_CLUSTER_HTTP_API_MULTIADDR in env is undefined');
const logger = loggerFactory({
defaultMeta: {
service: 'futureporn/common'
}
})
const getArgs = function () {
let args = [
'--no-check-certificate',
'--host', IPFS_CLUSTER_HTTP_API_MULTIADDR,
'--basic-auth', `${IPFS_CLUSTER_HTTP_API_USERNAME}:${IPFS_CLUSTER_HTTP_API_PASSWORD}`
]
return args
}
const getHttpsAgent = () => {
const httpsAgent = new https.Agent({
rejectUnauthorized: false
});
return httpsAgent
}
const fixInvalidJson = (invalidJson) => {
return invalidJson
.split('\n')
.filter((i) => i !== '')
.map((datum) => JSON.parse(datum))
}
/**
* query the cluster for a list of all the pins
*
* @resolves {String}
*/
const ipfsClusterPinsQuery = async () => {
const httpsAgent = getHttpsAgent()
const res = await fetch(`${ipfsClusterUri}/pins?stream-channels=false`, {
headers: {
'Authorization': `Basic ${Buffer.from(IPFS_CLUSTER_HTTP_API_USERNAME+':'+IPFS_CLUSTER_HTTP_API_PASSWORD, "utf-8").toString("base64")}`
},
agent: httpsAgent
})
const b = await res.text()
const c = b.split('\n')
const d = c.filter((i) => i !== '')
const e = d.map((datum) => JSON.parse(datum))
return e
}
const ipfsClusterStatus = async (pin) => {
const httpsAgent = getHttpsAgent()
const res = await fetch(`${ipfsClusterUri}/pins/${pin}`, {
headers: {
'Authorization': `Basic ${Buffer.from(IPFS_CLUSTER_HTTP_API_USERNAME+':'+IPFS_CLUSTER_HTTP_API_PASSWORD, "utf-8").toString("base64")}`
},
agent: httpsAgent
})
const b = await res.text()
return fixInvalidJson(b)
}
const ipfsClusterStatusAll = async (pin) => {
const httpsAgent = getHttpsAgent()
const res = await fetch(`${ipfsClusterUri}/pins`, {
headers: {
'Authorization': `Basic ${Buffer.from(IPFS_CLUSTER_HTTP_API_USERNAME+':'+IPFS_CLUSTER_HTTP_API_PASSWORD, "utf-8").toString("base64")}`
},
agent: httpsAgent
})
const b = await res.text()
return fixInvalidJson(b)
}
function countPinnedStatus(obj) {
let count = 0;
console.log(obj.peer_map)
for (let key in obj.peer_map) {
console.log(`comparing ${obj.peer_map[key].status}`)
if (obj.peer_map[key].status === "pinned") {
count++;
}
}
return count;
}
export default class Cluster {
constructor(opts) {
this.username = opts.username
this.password = opts.password
this.uri = opts.uri || 'https://cluster.sbtp.xyz:9094'
if (typeof this.username === 'undefined') throw new Error('username not defined');
if (typeof this.password === 'undefined') throw new Error('password not defined');
}
/**
*
* adds pin(s) to the cluster.
*/
async pinAdd(cid) {
if (Array.isArray(cid)) {
const results = await Promise.all(cid.map((cid) => this.pinAdd(cid)));
return results;
}
if (!cid) return;
const opts = {
https: { rejectUnauthorized: false },
headers: {
'Accept': '*/*',
'Authorization': `Basic ${Buffer.from(this.username+':'+this.password).toString('base64')}`
},
isStream: false
}
const res = await got.post(
`${this.uri}/pins/${cid}?stream-channels=false`,
opts
);
if (res.ok) {
return cid
}
}
async getPinCount(cid) {
if (Array.isArray(cid)) throw new Error('getPinCount only supports a string CID as argument')
const res = await this.pinStatus(cid);
let count = 0;
for (const peer of Object.values(res.peer_map)) {
if (peer.status === 'pinned') {
count++;
}
}
return count;
}
async pinStatus(cid) {
if (!cid) throw new Error('required arg cid was not defined');
const opts = {
timeout: {
request: 60000,
},
https: { rejectUnauthorized: false },
headers: {
'Accept': '*/*',
'Authorization': `Basic ${Buffer.from(this.username+':'+this.password).toString('base64')}`
},
isStream: false
};
try {
const json = await got.get(`${this.uri}/pins/${cid}?stream-channels=false`, opts).json();
return json;
} catch (error) {
console.error('THERE WAS AN ERROR');
console.error(error);
}
}
async add (filename, fileSize) {
const streamPipeline = promisify(pipeline);
const form = new FormData()
form.set('file', await fileFromPath(filename))
const opts = {
https: { rejectUnauthorized: false },
body: form,
headers: {
'Accept': '*/*',
'Authorization': `Basic ${Buffer.from(this.username+':'+this.password).toString('base64')}`
},
isStream: true
}
for (let i = 0; i < 5; i++) {
let bytesReport = 0
let timer
let output
try {
timer = setInterval(() => {
if (typeof fileSize !== 'undefined') {
logger.log({ level: 'info', message: `adding to IPFS. Progress: ${(bytesReport/fileSize*100).toFixed(2)}%`})
} else {
logger.log({ level: 'info', message: `adding to IPFS. Bytes transferred: ${bytesReport}` })
}
}, 60000*5)
logger.log({ level: 'info', message: `Adding ${filename} to IPFS cluster. Attempt ${i+1}` });
const res = await got.post(`${this.uri}/add?cid-version=1&progress=1`, opts);
// progress updates are streamed from the cluster
// for each update, just display it
// when a cid exists in the output, it's done.
for await (const chunk of res) {
const data = JSON.parse(chunk.toString());
bytesReport = data?.bytes
if (data?.cid) {
clearInterval(timer)
return data.cid;
}
}
} catch (e) {
logger.log({ level: 'error', message: `error while uploading! ${e}` });
if (i < 4) {
logger.log({ level: 'info', message: `Retrying the upload...` });
}
clearInterval(timer)
}
}
}
}

22
src/blah.js Normal file
View File

@ -0,0 +1,22 @@
/**
* a QA module needs the following things
*
*
* - check {function} - the function that runs to check if action is needed. returns true
* - isChangesNeeded {boolean} - whether or not action is needed
* - act {function} - the function that runs to make the necessary changes
*
*/
export async function isActionNeeded
export const qaModule = {
check: function (appContext, body) {
},
}

13
src/blah.ts Normal file
View File

@ -0,0 +1,13 @@
/**
* a QA module needs the following things
*
*
* - check {function} - the function that runs to check if action is needed
* - isChangesNeeded {boolean} - whether or not action is needed
* - act {function} - the function that runs to make the necessary changes
*
*/

67
src/ffmpeg.ts Normal file
View File

@ -0,0 +1,67 @@
import { execa } from 'execa';
import path, { join } from 'node:path';
const reportInterval = 60000
async function getTotalFrameCount (filename) {
const { exitCode, killed, stdout, stderr } = await execa('ffprobe', [
'-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=nb_frames',
'-of', 'default=nokey=1:noprint_wrappers=1',
filename
])
if (exitCode !== 0 || killed !== false) {
throw new Error(`problem while getting frame count. exitCode:${exitCode}, killed:${killed}, stdout:${stdout}, stderr:${stderr}`);
}
return parseInt(stdout)
}
/**
* @param {string} input
* @resolves {string} output
*/
export async function get240Transcode (appContext, filename) {
if (typeof filename === 'undefined') throw new Error('filename is undefined');
const progressFilePath = path.join(appContext.env.TMPDIR, 'ffmpeg-progress.log')
const outputFilePath = path.join(appContext.env.TMPDIR, path.basename(filename, '.mp4')+'_240p.mp4')
const totalFrames = await getTotalFrameCount(filename)
appContext.logger.log({ level: 'debug', message: `transcoding ${filename} to ${outputFilePath} and saving progress log to ${progressFilePath}` })
let progressReportTimer = setInterval(async () => {
try {
const frame = await getLastFrameNumber(progressFilePath)
appContext.logger.log({ level: 'info', message: `transcoder progress-- ${(frame/totalFrames*100).toFixed(2)}%` })
} catch (e) {
appContext.logger.log({ level: 'info', message: 'we got an error thingy while reading the ffmpeg-progress log but its ok we can just ignore and try again later.' })
}
}, reportInterval)
const { exitCode, killed, stdout, stderr } = await execa('ffmpeg', [
'-y',
'-i', filename,
'-vf', 'scale=w=-2:h=240',
'-b:v', '386k',
'-b:a', '45k',
'-progress', progressFilePath,
outputFilePath
]);
if (exitCode !== 0 || killed !== false) {
throw new RemuxError(`exitCode:${exitCode}, killed:${killed}, stdout:${stdout}, stderr:${stderr}`);
}
appContext.logger.log({ level: 'info', message: 'transcode COMPLETE!' })
clearInterval(progressReportTimer)
return outputFilePath
}
export const getFilename = (appContext, roomName) => {
const name = `${roomName}_${new Date().toISOString()}.ts`
return join(appContext.env.FUTUREPORN_WORKDIR, 'recordings', name);
}

64
src/fleek.ts Normal file
View File

@ -0,0 +1,64 @@
import { request, gql } from 'graphql-request';
import { getFleek } from './strapi.js';
import { debounce } from 'lodash-es';
export async function triggerWebsiteBuild(appContext) {
appContext.logger.log({ level: 'info', message: 'fleek.triggerWebsiteBuild() was executed. Will it actually run? that\'s up to debounce.' })
appContext.build = (appContext.build) ? appContext.build : debounce(() => {
__triggerWebsiteBuild(appContext)
}, 1000*60*30, { leading: true })
appContext.build()
}
export async function __triggerWebsiteBuild(appContext) {
const fleek = await getFleek(appContext)
const { jwt, siteId, endpoint, lastBuild } = fleek.attributes
console.log(`jwt:${jwt}, siteId:${siteId}, endpoint:${endpoint}`)
const document = gql`
mutation triggerDeploy($siteId: ID!) {
triggerDeploy(siteId: $siteId) {
...DeployDetail
__typename
}
}
fragment DeployDetail on Deploy {
id
startedAt
completedAt
status
ipfsHash
log
published
previewImage
repository {
url
name
owner
branch
commit
message
__typename
}
gitEvent
pullRequestUrl
taskId
__typename
}
`
const headers = {
'authorization': `Bearer ${jwt}`
}
const variables = {
siteId,
}
await request(
endpoint,
document,
variables,
headers
)
}

28
src/logger.ts Normal file
View File

@ -0,0 +1,28 @@
import winston from 'winston'
interface ILoggerOptions {
service: string;
}
export const loggerFactory = (options: ILoggerOptions) => {
const mergedOptions = Object.assign({}, {
level: 'info',
defaultMeta: { service: 'futureporn' }
}, options)
const logger = winston.createLogger(mergedOptions);
if (process.env.NODE_ENV !== 'production') {
logger.add(new winston.transports.Console({
level: 'debug',
format: winston.format.simple()
}))
} else {
logger.add(new winston.transports.Console({
level: 'info',
format: winston.format.json()
}))
}
return logger
}

33
src/strapi.ts Normal file
View File

@ -0,0 +1,33 @@
import { got } from 'got'
export async function getVod (appContext, vodId) {
const { data } = await got.get(`${appContext.env.STRAPI_URL}/api/vods/${vodId}?populate=*`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
}
}).json()
return data
}
export async function updateVod (appContext, vodId, data) {
const { data: output } = await got.put(`${appContext.env.STRAPI_URL}/api/vods/${vodId}`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data
}
}).json()
return output
}
export async function getFleek (appContext) {
const { data } = await got.get(`${appContext.env.STRAPI_URL}/api/fleek`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
}
}).json()
return data
}

View File

@ -0,0 +1,27 @@
import { getVod, updateVod } from './strapi.js'
import { getVideoSrcB2LocalFilePath } from './fsCommon.js'
import { get240Transcode } from './ffmpeg.js'
export default async function taskAddVideo240Hash(appContext, body) {
appContext.logger.log({ level: 'info', message: `[TASK] AddVideo240Hash begin` })
if (body.model === 'vod') {
if (body?.entry?.publishedAt) {
const vod = await getVod(appContext, body.entry.id)
const video240Hash = vod?.attributes?.video240Hash
if (!video240Hash) {
const videoSrcB2LocalFilePath = getVideoSrcB2LocalFilePath(appContext, vod)
const video240LocalFilePath = await get240Transcode(appContext, videoSrcB2LocalFilePath)
const cid = await appContext.cluster.add(video240LocalFilePath)
const data = await updateVod(appContext, vod.id, { video240Hash: cid })
appContext.changed = true
appContext.logger.log({ level: 'info', message: `Added ${cid} as video240Hash` })
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- video240Hash already exists'})
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- vod is not published.'})
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not a vod.'})
}
}

View File

@ -0,0 +1,42 @@
import { getVod, updateVod } from './strapi.js'
import { getVideoSrcB2LocalFilePath } from './fsCommon.js'
export default async function taskAddVideoSrcHash(appContext, body) {
// if vod
// if published
// if missing videoSrcHash
// ipfs add vod.videoSrcB2 => videoSrcHash
// logger.info added
// else
// logger.debug doing nothing-- videoSrcHash already exists
// //if missing video240Hash // too slow, adding in a different task
// // transcode
// // ipfs add /tmp/vod-1-240.mp4 => video240Hash
// //else
// // logger.debug doing nothing-- video240Hash already exists
// else
// logger.debug "doing nothing-- not published"
// else
// logger.debug "doing nothing-- not a vod"
appContext.logger.log({ level: 'info', message: `[TASK] AddVideoSrcHash begin` })
if (body.model === 'vod') {
if (body?.entry?.publishedAt) {
const vod = await getVod(appContext, body.entry.id)
const videoSrcHash = vod?.attributes?.videoSrcHash
appContext.logger.log({ level: 'debug', message: `>>>>>>here is the videoSrcHash:${videoSrcHash}`})
if (!videoSrcHash) {
const cid = await appContext.cluster.add(getVideoSrcB2LocalFilePath(appContext, vod))
const data = await updateVod(appContext, vod.id, { videoSrcHash: cid })
appContext.changed = true
appContext.logger.log({ level: 'info', message: `Added ${cid} as videoSrcHash` })
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- videoSrcHash already exists'})
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not published.'})
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not a vod.'})
}
}

212
src/taskAllocateMux.ts Normal file
View File

@ -0,0 +1,212 @@
import { got } from 'got';
import EleventyFetch from "@11ty/eleventy-fetch";
interface ApiResponse {
data: any[]; // Modify 'any[]' to match the actual structure of your API response
}
export async function getPatreonCampaign() {
return EleventyFetch('https://www.patreon.com/api/campaigns/8012692', {
duration: "1d",
type: "json",
})
}
export async function getPatreonCampaignPledgeSum() {
const campaign = await getPatreonCampaign()
return campaign.data.attributes.pledge_sum
}
/**
* Calculate how many mux allocations the site should have, based on the dollar amount of pledges from patreon
*
* @param {Number} pledgeSum - USD cents
*/
export function getMuxAllocationCount(pledgeSum) {
const dollarAmount = pledgeSum / 100; // convert USD cents to USD dollars
const muxAllocationCount = Math.floor(dollarAmount / 50); // calculate the number of mux allocations required
return muxAllocationCount;
}
export async function getAllPublishedVodsSortedByDate(appContext): Promise<any[]> {
try {
const response: ApiResponse = await got.get(`${appContext.env.STRAPI_URL}/api/vods?sort[0]=date%3Adesc&populate[0]=muxAsset&populate[1]=videoSrcB2`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
}
}).json();
if (Array.isArray(response.data)) {
return response.data;
} else {
// Handle the case where 'data' is not an array
throw new Error('API response does not contain a data array.');
}
} catch (error) {
// Handle the error, log it, or throw a custom error
console.error('Error fetching data:', error);
throw new Error('Failed to fetch data from the API');
}
}
// export async function getAllPublishedVodsSortedByDate(appContext) {
// const { data } = await got.get(`${appContext.env.STRAPI_URL}/api/vods?sort[0]=date%3Adesc&populate[0]=muxAsset&populate[1]=videoSrcB2`, {
// headers: {
// 'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
// }
// }).json()
// return data
// }
export async function idempotentlyAddMuxToVod(appContext, vod) {
if (!vod?.attributes?.videoSrcB2?.data?.attributes?.url) throw new Error(`vod is missing videoSrcB2 url which is required to add to Mux`);
const isActNeeded = (!vod?.attributes?.muxAsset?.data?.attributes?.playbackId) ? true : false
if (isActNeeded) {
// const { data: muxData } = await got.post(`${appContext.env.STRAPI_API_KEY}/mux-video-uploader/submitRemoteUpload`, {
// headers: {
// 'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
// },
// form: {
// title: vod.attributes.date,
// url: vod.attributes.videoSrcB2.url
// }
// }).json()
appContext.logger.log({ level: 'debug', message: `Creating Mux asset for vod ${vod.id} (${vod.attributes.videoSrcB2.data.attributes.cdnUrl})` })
const res: any = await got.post('https://api.mux.com/video/v1/assets', {
headers: {
'Authorization': `Basic ${Buffer.from(`${appContext.env.MUX_TOKEN_ID}:${appContext.env.MUX_TOKEN_SECRET}`).toString('base64')}`
},
json: {
"input": vod.attributes.videoSrcB2.data.attributes.cdnUrl,
"playback_policy": [
"signed"
]
}
})
const j = await res.json()
const { data: muxData } = j
appContext.logger.log({level: 'debug', message: `Adding Mux Asset to strapi`})
const res2: any = await got.post(`${appContext.env.STRAPI_URL}/api/mux-assets`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
playbackId: muxData.playback_ids.find((p) => p.policy === 'signed').id,
assetId: muxData.id
}
}
})
const jj = await res2.json()
const { data: muxAssetData } = jj
appContext.logger.log({ level: 'debug', message: `Relating Mux Asset to Vod ${vod.id}` })
const res3: any = await got.put(`${appContext.env.STRAPI_URL}/api/vods/${vod.id}`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
muxAsset: muxAssetData.id
}
}
})
const jjj = await res3.json()
const { data: strapiData } = jj;
appContext.changed = true
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing. Mux Asset is already present.' })
}
}
export async function idempotentlyRemoveMuxFromVod(appContext, vod) {
// first see if a Mux playback ID is already absent
// second, optionally act to ensure that the Mux playback ID is absent.
// if we acted, also delete the Mux asset
// if (actNeeded)
// remove Mux playback ID from Vod
// delete Mux asset
const isActNeeded = (vod?.attributes?.muxAsset?.data?.attributes?.assetId) ? true : false
if (isActNeeded) {
appContext.logger.log({ level: 'debug', message: `Deleting Mux Asset for vod ${vod.id}` })
const res: any = await got.delete(`https://api.mux.com/video/v1/assets/${vod.attributes.muxAsset.data.attributes.assetId}`, {
headers: {
'Authorization': `Basic ${Buffer.from(`${appContext.env.MUX_TOKEN_ID}:${appContext.env.MUX_TOKEN_SECRET}`).toString('base64')}`
}
})
const j = await res.json();
const { data: muxData } = j;
appContext.logger.log({ level: 'debug', message: `Removing Mux Asset relation from Vod ${vod.id}` })
const res2: any = await got.put(`${appContext.env.STRAPI_URL}/api/vods/${vod.id}`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
muxAsset: null
}
}
})
const jj = await res2.json();
const { data: strapiData } = jj;
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing. Mux Asset is already absent.' })
}
}
export function getMuxTargetVods(appContext, muxAllocationCount, vods) {
// get last N published vods
// where N is muxAllocationCount
return {
vodsForMux: vods.slice(0, muxAllocationCount),
vodsNotForMux: vods.slice(muxAllocationCount)
}
}
export async function createMuxAsset(appContext, videoUrl) {
const res: any = await got.post('https://api.mux.com/video/v1/assets', {
headers: {
'Authorization': `Basic ${Buffer.from(`${appContext.env.MUX_TOKEN_ID}:${appContext.env.MUX_TOKEN_SECRET}`).toString('base64')}`
},
json: {
input: videoUrl,
playback_policy: ['signed']
}
})
const j = await res.json();
const { data } = j;
}
export async function taskAllocateMux(appContext) {
appContext.logger.log({ level: 'info', message: 'taskAllocateMux begin' })
const pledgeSum = await getPatreonCampaignPledgeSum()
const muxAllocationCount = getMuxAllocationCount(pledgeSum)
appContext.logger.log({ level: 'debug', message: `pledgeSum:${pledgeSum}, muxAllocationCount:${muxAllocationCount}` })
const vods = await getAllPublishedVodsSortedByDate(appContext)
const { vodsForMux, vodsNotForMux } = getMuxTargetVods(appContext, muxAllocationCount, vods)
appContext.logger.log({ level: 'debug', message: `vodsForMux:${vodsForMux.map((v)=>v.id)}, vodsNotForMux:${vodsNotForMux.map((v)=>v.id)}`})
for (const vod of vodsForMux) { await idempotentlyAddMuxToVod(appContext, vod) };
for (const vod of vodsNotForMux) { await idempotentlyRemoveMuxFromVod(appContext, vod) }
}

81
src/taskAssertFfmpeg.ts Normal file
View File

@ -0,0 +1,81 @@
// const dest = (process.env.NODE_ENV === 'production') ? '/bin' : join(homedir(), '.local', 'bin')
// console.log(`ffmpeg dest:${dest}`)
// await ffbinaries.downloadFiles({
// destination: dest,
// platform: 'linux'
// }, function (err, data) {
// if (err) {
// console.log('error while downloading ffmpeg binaries ')
// throw err
// } else {
// console.log(data)
// }
// });
import { join } from 'path';
import { spawn } from 'child_process';
import ffbinaries from 'ffbinaries';
import fs from 'node:fs';
export const getFilename = (appContext, roomName) => {
const name = `${roomName}_${new Date().toISOString()}.ts`
return join(appContext.env.FUTUREPORN_WORKDIR, 'recordings', name);
}
export const assertDirectory = (appContext, directoryPath) => {
appContext.logger.log({ level: 'info', message: `asserting existence of ${directoryPath}` })
if (fs.statSync(directoryPath, { throwIfNoEntry: false }) === undefined) fs.mkdirSync(directoryPath, { recursive: true });
}
export default async function assertFFmpeg(appContext) {
return new Promise((resolve, reject) => {
const childProcess = spawn('ffmpeg', ['-version']);
childProcess.on('error', (err) => {
appContext.logger.log({
level: 'warn',
message: `ffmpeg -version failed, which likely means ffmpeg is not installed or not on $PATH`,
});
appContext.logger.log({
level: 'info',
message: 'downloading ffmpeg binary'
})
const dest = join(appContext.env.FUTUREPORN_WORKDIR, 'bin');
assertDirectory(appContext, dest);
appContext.logger.log({
level: 'info',
message: 'downloading ffmpeg'
})
ffbinaries.downloadFiles({ destination: dest, platform: 'linux' }, function (err, data) {
if (err) reject(err)
else resolve()
});
});
childProcess.on('exit', (code) => {
if (code !== 0) reject(`'ffmpeg -version' exited with code ${code}`)
if (code === 0) {
appContext.logger.log({ level: 'info', message: `ffmpeg PRESENT.` });
resolve()
}
});
})
};
export const assertDependencyDirectory = (appContext) => {
// Extract the directory path from the filename
const directoryPath = join(appContext.env.FUTUREPORN_WORKDIR, 'recordings');
console.log(`asserting ${directoryPath} exists`)
// Check if the directory exists, and create it if it doesn't
if (!fs.existsSync(directoryPath)) {
fs.mkdirSync(directoryPath, { recursive: true });
console.log(`Created directory: ${directoryPath}`);
}
}

View File

@ -0,0 +1,92 @@
import Prevvy from 'prevvy';
import path from 'node:path';
import { got } from 'got';
import { getVod } from './strapi.js';
import { getVideoSrcB2LocalFilePath } from './fsCommon.js';
import { uploadToB2 } from './b2.js'
export async function generateThumbnail (vod, ) {
const fileName = `vod-${vod?.id}-thumb.png`
const thumbnailFilePath = path.join(appContext.env.TMPDIR, fileName)
appContext.logger.log({ level: 'info', message: `Creating thumbnail from ${thumbnailFilePath}`})
const thumb = new Prevvy({
input: getVideoSrcB2LocalFilePath(appContext, vod),
output: thumbnailFilePath,
throttleTimeout: 2000,
width: 128,
cols: 5,
rows: 5,
})
await thumb.generate();
// upload thumbnail to B2
const uploadData = await uploadToB2(appContext, thumbnailFilePath);
return uploadData
}
export async function associateB2WithVod (appContext, uploadData) {
// create b2-file in Strapi
const { data: thumbData } = await got.post(`${appContext.env.STRAPI_URL}/api/b2-files`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
key: uploadData.key,
uploadId: uploadData.uploadId,
url: uploadData.url
}
}
}).json()
// associate b2-file with vod in strapi
const { data: vodData } = await got.put(`${appContext.env.STRAPI_URL}/api/vods/${vod.id}`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
thumbnail: thumbData.id
}
}
}).json()
}
export default async function taskAssertThumbnail (appContext, body) {
if (body.model === 'vod') {
const vod = await getVod(appContext, body.entry.id)
appContext.logger.log({ level: 'info', message: 'taskAssertThumbnail begin' })
const cdnUrl = vod?.attributes?.thumbnail?.data?.attributes?.cdnUrl;
const thumbnailKey = vod?.attributes?.thumbnail?.data?.attributes?.key
let generateANewThumbnail = false; // default
// If thumbnail is absent, create it
if (!thumbnailKey && !cdnUrl) {
generateANewThumbnail = true;
// if thumbnail is present, verify HTTP 200
} else {
const response = await got(cdnUrl, { method: 'HEAD', throwHttpErrors: false });
if (!response.ok) {
generateANewThumbnail = true
} else {
// response was OK, so thumb must be good
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- thumbnail already exists.'})
}
}
if (generateANewThumbnail) {
appContext.logger.log({ level: 'debug', message: `Generating a new thumbnail for vod ${vod.attributes.id}`})
const uploadData = await generateThumbnail(appContext, body)
await associateB2WithVod(appContext, uploadData)
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not a vod.'})
}
}

View File

@ -0,0 +1,10 @@
export default async function taskContinueOnlyIfPublished (appContext, body) {
appContext.logger.log({ level: 'info', message: 'taskContinueOnlyIfPublished started.' })
appContext.logger.log({ level: 'info', message: JSON.stringify(body, 0, 2) })
if (body.model !== 'vod' || (body.entry && body.entry.publishedAt === null) ) {
appContext.logger.log({ level: 'info', message: `WEEE WOO WEE WOOO this is not a vod!... or it's not published. either way we stop doing tasks here.` })
throw new Error('Aborting tasks because this is not a vod or not published.')
} else {
appContext.logger.log({ level: 'debug', message: `taskContinueOnlyIfPublished is complete. This is a published vod so we are continuing with tasks.` })
}
}

View File

@ -0,0 +1,23 @@
import { downloadVideoSrcB2 } from './b2.js';
import { getVod } from './strapi.js';
import fs from 'node:fs';
import { getVideoSrcB2LocalFilePath } from './fsCommon.js';
export default async function taskDownloadVideoSrcB2 (appContext, body) {
appContext.logger.log({ level: 'info', message: 'taskDownloadVideoSrcB2 started.' })
if (body.model === 'vod') {
const vod = await getVod(appContext, body.entry.id)
// download is only necessary when thumbnail or videoSrcHash is missing
const hasThumbnail = (vod?.attributes?.thumbnailB2?.data?.attributes?.cdnUrl) ? true : false
const hasVideoSrcHash = (vod?.attributes?.videoSrcHash) ? true : false
if (!hasThumbnail || !hasVideoSrcHash) {
await downloadVideoSrcB2(appContext, vod)
} else {
appContext.logger.log({ level: 'info', message: 'Doing nothing-- No need for downloading videoSrcB2.' })
}
} else {
appContext.logger.log({ level: 'info', message: 'Doing nothing-- entry is not a vod.'})
}
}

View File

45
src/taskPinIpfsContent.ts Normal file
View File

@ -0,0 +1,45 @@
export async function idempotentlyPinIpfsContent(appContext, body) {
let results = []
const cids = [
body?.entry?.videoSrcHash,
body?.entry?.video240Hash,
body?.entry?.thiccHash
]
appContext.logger.log({ level: 'info', message: `Here are the CIDs yoinked fresh from the webhook:${JSON.stringify(cids)}` })
const validCids = cids.filter((c) => c !== '' && c !== null && c !== undefined)
appContext.logger.log({ level: 'info', message: `Here are the valid CIDs:${JSON.stringify(validCids)}` })
if (validCids.length === 0) return results
for (const vc of validCids) {
appContext.logger.log({ level: 'info', message: `checking to see if ${vc} is pinned` })
const pinCount = await appContext.cluster.getPinCount(vc)
if (pinCount < 1) {
appContext.logger.log({ level: 'info', message: `${vc} is pinned on ${pinCount} appContext.cluster peers.` })
const pinnedCid = await appContext.cluster.pinAdd(vc)
results.push(pinnedCid)
}
}
return results
}
export async function taskPinIpfsContent(appContext, body) {
appContext.logger.log({ level: 'info', message: `idempotentlyPinIpfsContent` })
appContext.logger.log({ level: 'info', message: JSON.stringify(body) })
const pins = await idempotentlyPinIpfsContent(appContext, body, appContext.cluster)
appContext.logger.log({ level: 'info', message: `${JSON.stringify(pins)}` })
if (pins.length > 0) {
appContext.logger.log({ level: 'info', message: `Pinned ${pins}` })
return {
message: `Pinned ${pins}`
}
} else {
appContext.logger.log({ level: 'info', message: `Nothing to pin!` })
return {
message: `Nothing to pin`
}
}
}

View File

@ -0,0 +1,10 @@
export default async function taskTriggerWebsiteBuild (appContext) {
if (appContext.changed) {
appContext.logger.log({
level: 'info',
message: `@TODO -- trigger website build. This is not automated at the moment because we build on local dev machine and we don't have a good way of triggering a build there. Maybe some zerotier network with faye would be a solution here.`
})
}
}

0
src/vod.ts Normal file
View File

92
tasks/assertThumbnail.ts Normal file
View File

@ -0,0 +1,92 @@
import Prevvy from 'prevvy';
import path from 'node:path';
import { got } from 'got';
import { getVod } from '../lib/strapi.js';
import { getVideoSrcB2LocalFilePath } from '../lib/fsCommon.js';
import { uploadToB2 } from '../lib/b2.js'
export async function generateThumbnail (vod) {
const fileName = `vod-${vod?.id}-thumb.png`
const thumbnailFilePath = path.join(appContext.env.TMPDIR, fileName)
appContext.logger.log({ level: 'info', message: `Creating thumbnail from ${thumbnailFilePath}`})
const thumb = new Prevvy({
input: getVideoSrcB2LocalFilePath(appContext, vod),
output: thumbnailFilePath,
throttleTimeout: 2000,
width: 128,
cols: 5,
rows: 5,
})
await thumb.generate();
// upload thumbnail to B2
const uploadData = await uploadToB2(appContext, thumbnailFilePath);
return uploadData
}
export async function associateB2WithVod (appContext, uploadData) {
// create b2-file in Strapi
const { data: thumbData } = await got.post(`${appContext.env.STRAPI_URL}/api/b2-files`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
key: uploadData.key,
uploadId: uploadData.uploadId,
url: uploadData.url
}
}
}).json()
// associate b2-file with vod in strapi
const { data: vodData } = await got.put(`${appContext.env.STRAPI_URL}/api/vods/${vod.id}`, {
headers: {
'Authorization': `Bearer ${appContext.env.STRAPI_API_KEY}`
},
json: {
data: {
thumbnail: thumbData.id
}
}
}).json()
}
export default async function taskAssertThumbnail (appContext, body) {
if (body.model === 'vod') {
const vod = await getVod(appContext, body.entry.id)
appContext.logger.log({ level: 'info', message: 'taskAssertThumbnail begin' })
const cdnUrl = vod?.attributes?.thumbnail?.data?.attributes?.cdnUrl;
const thumbnailKey = vod?.attributes?.thumbnail?.data?.attributes?.key
let generateANewThumbnail = false; // default
// If thumbnail is absent, create it
if (!thumbnailKey && !cdnUrl) {
generateANewThumbnail = true;
// if thumbnail is present, verify HTTP 200
} else {
const response = await got(cdnUrl, { method: 'HEAD', throwHttpErrors: false });
if (!response.ok) {
generateANewThumbnail = true
} else {
// response was OK, so thumb must be good
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- thumbnail already exists.'})
}
}
if (generateANewThumbnail) {
appContext.logger.log({ level: 'debug', message: `Generating a new thumbnail for vod ${vod.attributes.id}`})
const uploadData = await generateThumbnail(appContext, body)
await associateB2WithVod(appContext, uploadData)
}
} else {
appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not a vod.'})
}
}

101
tasks/createMuxAsset.ts Normal file
View File

@ -0,0 +1,101 @@
import { IMuxAsset, IVod } from "../lib/vods.js";
import fetch from 'node-fetch';
import { IJobData } from "../worker.js";
import PgBoss from "pg-boss";
import { getVod } from "../lib/vods.js";
export interface ICreateMuxAssetResponse {
id: string;
}
export async function createMuxAsset(job: PgBoss.Job, env: NodeJS.ProcessEnv) {
const data = job.data as IJobData
const id = data.id;
const vod = await getVod(id);
if (!vod) {
const msg = `panick! vod was not fetched`
console.error(msg)
throw new Error(msg)
}
if (!vod?.videoSrcB2?.cdnUrl) {
const msg = `panick! videoSrcB2 missing on vod ${vod.id}`
console.error(msg)
throw new Error(msg)
}
console.log(`Creating Mux asset for vod ${vod.id} (${vod.videoSrcB2.cdnUrl})`);
if (!vod.videoSrcB2?.cdnUrl) {
const msg = 'panic! videoSrcB2.cdnUrl is missing!';
console.error(msg);
throw new Error(msg);
}
// Create Mux Asset
const muxResponse = await fetch('https://api.mux.com/video/v1/assets', {
method: 'POST',
headers: {
'Authorization': `Basic ${Buffer.from(`${env.MUX_TOKEN_ID}:${env.MUX_TOKEN_SECRET}`).toString('base64')}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
"input": vod.videoSrcB2.cdnUrl,
"playback_policy": [
"signed"
]
}),
});
const muxData = await muxResponse.json() as { playback_ids: Array<string>; id: string };
console.log(muxData)
if (!muxData?.playback_ids) {
const msg = `panick! muxData was missing playback_ids`
console.error(msg)
throw new Error(msg)
}
console.log(`Adding Mux Asset to strapi`);
const playbackId = muxData.playback_ids.find((p: any) => p.policy === 'signed')
if (!playbackId) {
const msg = `panick: playbackId was not found in the muxData`
console.error(msg)
throw new Error(msg)
}
// Add Mux Asset to Strapi
const muxAssetResponse = await fetch(`${env.STRAPI_URL}/api/mux-assets`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.STRAPI_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
data: {
playbackId: playbackId,
assetId: muxData.id
}
}),
});
const muxAssetData = await muxAssetResponse.json() as ICreateMuxAssetResponse;
console.log({ level: 'debug', message: `Relating Mux Asset to Vod ${vod.id}` });
// Relate Mux Asset to Vod
const strapiResponse = await fetch(`${env.STRAPI_URL}/api/vods/${vod.id}`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${env.STRAPI_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
data: {
muxAsset: muxAssetData.id
}
}),
});
const strapiData = await strapiResponse.json();
}

30
tasks/deleteThumbnail.ts Normal file
View File

@ -0,0 +1,30 @@
import PgBoss from "pg-boss";
import { strapiUrl } from "../lib/constants.js";
import fetch from 'node-fetch';
import { getVod } from '../lib/vods.js';
interface IJobData {
id: number;
}
export async function deleteThumbnail(data: PgBoss.Job, boss: PgBoss, env: NodeJS.ProcessEnv) {
const jobData = data.data as IJobData;
const vod = await getVod(jobData.id)
if (!vod?.thumbnail?.id) throw new Error('vod.thumbnail was missing')
const res = await fetch(`${strapiUrl}/api/b2-files/${vod?.thumbnail.id}`, {
method: 'DELETE',
headers: {
'Authorization': `Bearer ${env.STRAPI_API_KEY}`
}
})
if (!res.ok) {
console.log(`Response code: ${res.status} (ok:${res.ok})`)
throw new Error(`could not delete thumbnail due to fetch response error ${res.body}`);
} else {
console.log(`thumbnail ${vod.thumbnail.id} deleted`)
return
}
}

156
tasks/generateThumbnail.ts Normal file
View File

@ -0,0 +1,156 @@
import Prevvy from 'prevvy';
import path from 'node:path';
import { got } from 'got';
import { getVod } from '../lib/vods.js';
import { getVideoSrcB2LocalFilePath } from '../lib/fsCommon.js';
import { uploadToB2 } from '../lib/b2.js'
import { IVod } from '../lib/vods.js';
import { IJobData } from '../worker.js';
import PgBoss from 'pg-boss';
import { IB2File } from '../lib/b2File.js';
export interface IUploadData {
key: string;
uploadId: string;
url: string;
}
export async function __generateThumbnail (vod: IVod, env: NodeJS.ProcessEnv): Promise<string> {
const fileName = `vod-${vod?.id}-thumb.png`
const thumbnailFilePath = path.join(env.TMPDIR, fileName)
const videoInputUrl = vod.videoSrcB2?.cdnUrl;
console.log(`Creating thumbnail from ${videoInputUrl} ---> ${thumbnailFilePath}`)
const thumb = new Prevvy({
input: videoInputUrl,
output: thumbnailFilePath,
throttleTimeout: 2000,
width: 128,
cols: 5,
rows: 5,
})
await thumb.generate();
return thumbnailFilePath
}
export async function associateB2WithVod(vod: IVod, uploadData: IUploadData, env: NodeJS.ProcessEnv) {
console.log(`lets create b2-file in Strapi`);
// Create the B2 file
const thumbResponse = await fetch(`${env.STRAPI_URL}/api/b2-files`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${env.STRAPI_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
data: {
key: uploadData.key,
uploadId: uploadData.uploadId,
url: uploadData.url,
cdnUrl: `https://futureporn-b2.b-cdn.net/${uploadData.key}`
},
}),
});
if (!thumbResponse.ok) {
const msg = `Failed to create B2 file: ${thumbResponse.statusText}`
console.error(msg)
throw new Error(msg);
}
const thumbData = await thumbResponse.json() as { data: IB2File };
console.log(` B2 file creation complete for B2 file id: ${thumbData.data.id}`);
console.log(` ^v^v^v^v^v^v`);
console.log(thumbData);
console.log(`lets associate B2-file with VOD ${vod.id} in Strapi`);
// Associate B2 file with VOD
const associateResponse = await fetch(`${env.STRAPI_URL}/api/vods/${vod.id}`, {
method: 'PUT',
headers: {
'Authorization': `Bearer ${env.STRAPI_API_KEY}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
data: {
thumbnail: thumbData.data.id,
},
}),
});
if (!associateResponse.ok) {
const msg = `Failed to associate B2 file with VOD: ${associateResponse.statusText}`;
console.error(msg)
throw new Error(msg)
}
console.log(`Association complete`);
}
// export default async function taskAssertThumbnail (appContext, body) {
// if (body.model === 'vod') {
// const vod = await getVod(appContext, body.entry.id)
// appContext.logger.log({ level: 'info', message: 'taskAssertThumbnail begin' })
// const cdnUrl = vod?.attributes?.thumbnail?.data?.attributes?.cdnUrl;
// const thumbnailKey = vod?.attributes?.thumbnail?.data?.attributes?.key
// let generateANewThumbnail = false; // default
// // If thumbnail is absent, create it
// if (!thumbnailKey && !cdnUrl) {
// generateANewThumbnail = true;
// // if thumbnail is present, verify HTTP 200
// } else {
// const response = await got(cdnUrl, { method: 'HEAD', throwHttpErrors: false });
// if (!response.ok) {
// generateANewThumbnail = true
// } else {
// // response was OK, so thumb must be good
// appContext.logger.log({ level: 'debug', message: 'Doing nothing-- thumbnail already exists.'})
// }
// }
// if (generateANewThumbnail) {
// appContext.logger.log({ level: 'debug', message: `Generating a new thumbnail for vod ${vod.attributes.id}`})
// const uploadData = await generateThumbnail(appContext, body)
// await associateB2WithVod(appContext, uploadData)
// }
// } else {
// appContext.logger.log({ level: 'debug', message: 'Doing nothing-- entry is not a vod.'})
// }
// }
export async function generateThumbnail(job: PgBoss.Job, env: NodeJS.ProcessEnv) {
const data = job.data as IJobData;
console.log(data)
console.log(`>>>>>>>>>>>>>>>>generateThumbnail begin. finding vod id:${data.id} (${typeof data.id})`)
const vod = await getVod(data.id);
if (!vod) throw new Error('panic! vod missing')
console.log('__generateThumbnail begin')
const thumbnailFilePath = await __generateThumbnail(vod, env);
console.log(` uploading thumbnail ${thumbnailFilePath} for vod ${data.id} to B2`);
const uploadData = await uploadToB2(env, thumbnailFilePath);
if (!uploadData) {
const msg = 'panic! uploadData missing'
console.error(msg)
throw new Error(msg);
}
console.log(` associating thumbnail for vod ${data.id} with strapi`)
await associateB2WithVod(vod, uploadData, env)
console.log(` 👍👍👍 thumbnail associated with vod ${data.id}`);
}

110
tasks/identifyVodIssues.ts Normal file
View File

@ -0,0 +1,110 @@
import PgBoss from "pg-boss";
import { getVods, getVod, IVod, getRandomVod } from "../lib/vods.js";
import fetch from 'node-fetch';
import { isBefore } from 'date-fns';
import { IJobData } from "../worker.js";
interface IIssueDefinition {
name: string;
check: ((vod: IVod) => Promise<boolean>);
solution: string;
}
/*
The check functions return true if there is an issue
This is an exhaustive list of all possible problems that can exist on a vod.
*/
const issueDefinitions: IIssueDefinition[] = [
// {
// name: 'thumbnailMissing',
// check: async (vod) => {
// if (!vod?.thumbnail?.cdnUrl) return true;
// else return false;
// },
// solution: 'generateThumbnail'
// },
// {
// name: 'thumbnailUnreachable',
// check: async (vod) => {
// if (!vod?.thumbnail?.cdnUrl) return false; // false because the problem isn't explicitly that the thumb is unreachable
// const response = await fetch(vod.thumbnail.cdnUrl);
// if (!response.ok) return true;
// else return false;
// },
// solution: 'deleteThumbnail'
// },
{
name: 'muxAssetMissing',
check: async (vod) => {
// we only want to allocate new videos
// so we only consider vods published after
// a certain date
const allocationCutoffDate = new Date('2019-09-24T00:00:00.000Z');
const vodDate = new Date(vod.date2);
const isVodOld = isBefore(vodDate, allocationCutoffDate)
console.log(`muxAsset:${vod?.muxAsset?.assetId}, vodDate:${vod.date2}, allocationCutoffDate:${allocationCutoffDate.toISOString()}, isVodOld:${isVodOld}`)
if (isVodOld) return false;
if (!!vod?.muxAsset?.assetId) return false;
console.info(`vod ${vod.id} is missing a muxAsset!`)
return true;
},
solution: 'createMuxAsset'
}
]
export async function identifyVodIssues(job: PgBoss.Job, boss: PgBoss) {
const data = job.data as IJobData;
let vod: IVod | null;
console.log(` the id fed to us was ${data?.id}`)
// determine if we received a vod id or if we need to choose a random vod
if (!data?.id) {
// get a random vod
vod = await getRandomVod();
} else {
// get a vod by id
vod = await getVod(data.id);
}
if (!vod) {
const msg = 'Panic! Could not get a vod';
console.error(msg);
throw new Error(msg);
}
const id = vod?.id;
console.log(`## VOD ${id} ##`);
for (const iDef of issueDefinitions) {
const isProblem = await iDef.check(vod);
const status = isProblem ? '🔴 FAIL' : '🟢 PASS';
console.log(` ${status} ${iDef.name} ${isProblem ? `(queueing ${iDef.solution} to solve.)` : ''}`);
if (isProblem) {
boss.send(iDef.solution, { id: data.id }, { priority: 20 });
}
}
}
// import { PrismaClient } from '@prisma/client'
// const prisma = new PrismaClient()
// async function main() {
// const users = await prisma.user.findMany()
// console.log(users)
// }
// main()
// .then(async () => {
// await prisma.$disconnect()
// })
// .catch(async (e) => {
// console.error(e)
// await prisma.$disconnect()
// process.exit(1)
// })

15
tasks/thumbnails.ts Normal file
View File

@ -0,0 +1,15 @@
const jobName = 'onCompleteFtw'
const requestPayload = { token:'trivial' }
const responsePayload = { message: 'so verbose', code: '1234' }
boss.onComplete(jobName, job => {
assert.strictEqual(jobId, job.data.request.id)
assert.strictEqual(job.data.request.data.token, requestPayload.token)
assert.strictEqual(job.data.response.message, responsePayload.message)
assert.strictEqual(job.data.response.code, responsePayload.code)
finished() // test suite completion callback
})
const jobId = await boss.send(jobName, requestPayload)
const job = await boss.fetch(jobName)
await boss.complete(job.id, responsePayload)

16
tsconfig.json Normal file
View File

@ -0,0 +1,16 @@
{
"compilerOptions": {
"outDir": "./dist",
"allowJs": false,
"target": "ES2020",
"module": "node16",
"moduleResolution": "node16",
"esModuleInterop": true,
"strict": false
},
"include": ["src/**/*"],
"exclude": [
"node_modules",
"<node_internals>/**"
]
}

26
worker.ts Normal file
View File

@ -0,0 +1,26 @@
import * as dotenv from 'dotenv'
import PgBoss from 'pg-boss';
import {identifyVodIssues} from './tasks/identifyVodIssues.js';
import {generateThumbnail} from './tasks/generateThumbnail.js';
import {deleteThumbnail} from './tasks/deleteThumbnail.js';
export interface IJobData {
id?: number;
env: NodeJS.ProcessEnv;
}
dotenv.config()
if (!process.env.PG_DSN) throw new Error('PG_DSN is missing in env')
const PG_DSN = process.env.PG_DSN
async function main () {
const boss = new PgBoss(PG_DSN);
await boss.start()
await boss.work('identifyVodIssues', (job: PgBoss.Job) => identifyVodIssues(job, boss));
await boss.work('generateThumbnail', { teamConcurrency: 1, teamSize: 1 }, (job: PgBoss.Job) => generateThumbnail(job, process.env));
await boss.work('deleteThumbnail', (job: PgBoss.Job) => deleteThumbnail(job, boss, process.env));
}
main()