move fetchers to their own package

This commit is contained in:
CJ_Clippy 2024-09-05 21:39:08 -08:00
parent 614bf16cf8
commit 4d65294f7d
70 changed files with 812 additions and 443 deletions

@ -10,6 +10,10 @@ Kubernetes for Production, deployed using FluxCD
Tested on VKE v1.30.0+1 (PVCs on other versions may not be fulfilled)
devbox for shareable development environment
ggshield for preventing git commits containing secrets
direnv for loading .envrc
Graphile Worker for work queue, cron

@ -184,6 +184,7 @@ docker_build(
'./services/bot',
'./packages/types',
'./packages/utils',
'./packages/fetchers',
],
dockerfile='./dockerfiles/bot.dockerfile',
target='dev',
@ -202,6 +203,7 @@ docker_build(
'./pnpm-workspace.yaml',
'./packages/types',
'./packages/utils',
'./packages/fetchers',
'./services/scout',
],
dockerfile='./dockerfiles/scout.dockerfile',
@ -324,6 +326,7 @@ docker_build(
'./services/mailbox',
'./packages/types',
'./packages/utils',
'./packages/fetchers',
'./packages/video',
'./packages/storage',
],
@ -350,6 +353,7 @@ docker_build(
'./pnpm-workspace.yaml',
'./packages/types',
'./packages/utils',
'./packages/fetchers',
'./services/capture',
],
live_update=[
@ -385,7 +389,7 @@ docker_build(
k8s_resource(
workload='scout',
resource_deps=['postgresql-primary'],
port_forwards=['5134'],
port_forwards=['8134'],
labels=['backend'],
)
k8s_resource(

@ -40,6 +40,8 @@ spec:
- name: capture-worker
image: "{{ .Values.capture.imageName }}"
env:
# - name: NODE_DEBUG
# value: "stream.onWriteComplete"
- name: SCOUT_URL
value: "{{ .Values.scout.url }}"
- name: FUNCTION
@ -67,7 +69,7 @@ spec:
value: "{{ .Values.s3.endpoint }}"
- name: S3_REGION
value: "{{ .Values.s3.region }}"
- name: S3_BUCKET
- name: S3_USC_BUCKET
value: "{{ .Values.s3.buckets.usc }}"
- name: S3_ACCESS_KEY_ID
valueFrom:

@ -77,8 +77,8 @@ chihaya:
scout:
imageName: fp/scout
replicas: 1
port: 5134
url: http://scout.futureporn.svc.cluster.local:5134
port: 8134
url: http://scout.futureporn.svc.cluster.local:8134
postgrest:
url: http://postgrest.futureporn.svc.cluster.local:9000
image: postgrest/postgrest

@ -11,7 +11,8 @@
"ffmpeg@latest",
"yt-dlp@latest",
"python310@latest",
"python310Packages.pip@latest"
"python310Packages.pip@latest",
"vips@latest"
],
"env": {
"DEVBOX_COREPACK_ENABLED": "true",

@ -680,6 +680,114 @@
}
}
},
"vips@latest": {
"last_modified": "2024-09-01T03:39:50Z",
"resolved": "github:NixOS/nixpkgs/4a9443e2a4e06cbaff89056b5cdf6777c1fe5755#vips",
"source": "devbox-search",
"version": "8.15.2",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/06dc4n06zs4jygbqa8c7fg9dcg39q9j9-vips-8.15.2-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/442bpcz5cx0x2ddc2cp4041arqf87ph0-vips-8.15.2-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/1l68927a29x2g1kyxcf7kxsh0lpnshvn-vips-8.15.2-dev"
},
{
"name": "out",
"path": "/nix/store/6jnr4f8i012ki83bp0aml84acfigsdqr-vips-8.15.2"
}
],
"store_path": "/nix/store/06dc4n06zs4jygbqa8c7fg9dcg39q9j9-vips-8.15.2-bin"
},
"aarch64-linux": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/17asmchn0w4s7gic59r75n3drqdv12cn-vips-8.15.2-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/1syxsf45h4ppyx1q0nynx1wnl4dxrnnv-vips-8.15.2-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/afy2j1kdch6sb3ryqljlf163khs7hwkl-vips-8.15.2-dev"
},
{
"name": "devdoc",
"path": "/nix/store/sifwrcrirz82gv0d9n0a683qz88msfzp-vips-8.15.2-devdoc"
},
{
"name": "out",
"path": "/nix/store/rhlb3vk0sqy6cn521806qpw1rqzji0iv-vips-8.15.2"
}
],
"store_path": "/nix/store/17asmchn0w4s7gic59r75n3drqdv12cn-vips-8.15.2-bin"
},
"x86_64-darwin": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/gzqa0mv4czbw0c5243r97bhw8kch8qmi-vips-8.15.2-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/38knm1dzqngi6w517g147gnpd2q720b1-vips-8.15.2-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/f9vf62bnkrn1azgyvcjnmgsh1jknc33b-vips-8.15.2-dev"
},
{
"name": "out",
"path": "/nix/store/487rhwp65bxzhanzf7brma1lcbis7w6c-vips-8.15.2"
}
],
"store_path": "/nix/store/gzqa0mv4czbw0c5243r97bhw8kch8qmi-vips-8.15.2-bin"
},
"x86_64-linux": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/bfy830xkb34pn4dym88rhzsqvlav25bq-vips-8.15.2-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/ppi3phzcnr8bh4q4jnz5pp2grvh1dki9-vips-8.15.2-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/6v5dd9lz15dls9k62dqnmdpi2af1y59n-vips-8.15.2-dev"
},
{
"name": "devdoc",
"path": "/nix/store/9kf40jgp84bkxc6202x8z4nqwzn8627v-vips-8.15.2-devdoc"
},
{
"name": "out",
"path": "/nix/store/k6aq70ncjh3d0vjcw6r8grw764hlagn5-vips-8.15.2"
}
],
"store_path": "/nix/store/bfy830xkb34pn4dym88rhzsqvlav25bq-vips-8.15.2-bin"
}
}
},
"yt-dlp@latest": {
"last_modified": "2024-07-18T22:08:26Z",
"resolved": "github:NixOS/nixpkgs/cfa5366588c940ab6ee3bee399b337175545c664#yt-dlp",

@ -10,6 +10,7 @@ COPY pnpm-lock.yaml .npmrc package.json .
COPY ./services/bot/ ./services/bot/
COPY ./packages/types/ ./packages/types/
COPY ./packages/utils/ ./packages/utils/
COPY ./packages/fetchers/ ./packages/fetchers/
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline

@ -24,6 +24,7 @@ COPY package.json pnpm-lock.yaml pnpm-workspace.yaml .npmrc .
COPY ./services/capture/package.json ./services/capture/pnpm-lock.yaml ./services/capture/
COPY ./packages/types/package.json ./packages/types/pnpm-lock.yaml ./packages/types/
COPY ./packages/utils/package.json ./packages/utils/pnpm-lock.yaml ./packages/utils/
COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/
## install npm packages
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
@ -33,6 +34,7 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --pre
COPY ./services/capture/ ./services/capture/
COPY ./packages/types/ ./packages/types/
COPY ./packages/utils/ ./packages/utils/
COPY ./packages/fetchers/ ./packages/fetchers/
## Run the build process and generate the artifacts
RUN pnpm run -r build

@ -20,6 +20,7 @@ RUN mkdir -p /app/services/factory && mkdir -p /prod/factory
## Copy manfiests, lockfiles, and configs into docker context
COPY package.json pnpm-lock.yaml .npmrc .
COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/utils/
COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/
COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/
COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/
COPY ./services/factory/pnpm-lock.yaml ./services/factory/package.json ./services/factory/
@ -31,6 +32,7 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --pre
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline
## Copy package code into docker context
COPY ./packages/utils/ ./packages/utils/
COPY ./packages/fetchers/ ./packages/fetchers/
RUN ls -la /app/packages/utils/node_modules/prevvy/
RUn cat ./packages/utils/package.json
COPY ./packages/storage/ ./packages/storage/

@ -24,6 +24,7 @@ RUN pnpm fetch
# COPY pnpm-lock.yaml .npmrc package.json .
COPY ./services/next ./services/next
COPY ./packages/types ./packages/types
COPY ./packages/fetchers ./packages/fetchers
# COPY ./packages/strapi ./packages/strapi
# COPY ./packages/utils ./packages/utils

@ -22,6 +22,7 @@ COPY pnpm-lock.yaml .npmrc package.json .
COPY ./services/scout/ ./services/scout/
COPY ./packages/types/ ./packages/types/
COPY ./packages/utils/ ./packages/utils/
COPY ./packages/fetchers/ ./packages/fetchers/
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline

@ -1,62 +0,0 @@
## d.worker.dockerfile
##
## @futureporn/worker is the system component which runs background tasks.
## Tasks such as thumbnail generation, video encoding, file transfers, etc.
##
## @todo future improvement might be merging the dockerfiles for the various monorepo packages.
## this is not an easy task, so I'm not doing it right now.
## "make it work, make it right, make it fast" (in that order)
## Right now we are making things work with separate dockerfiles for each package.
## One thing to determine is build speed. If we're developing in Tilt and have to wait 20 minutes for the build to complete
## every time we change a file in any dependent package, then merging dockerfiles is not desirable.
## One of the slow parts of the docker build is copying all package directories into the build context.
## If we have a lot of packages, it takes a long time.
## I have yet to determine performance benchmarks, so it's unclear if merging dockerfiles is desirable.
##
## @todo another performance improvement would almost certainly be to move strapi, next, and similar packages from `packages/*` into `services/*`
## this way, when we're building the various @futureporn library-type packages, we don't have to filter and COPY the dependency packages one-by-one.
## instead, we add the entire `packages/*` directory and then move on to the next step.
FROM node:20 AS base
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app
FROM base AS build
WORKDIR /app
RUN mkdir -p /app/packages/worker && mkdir -p /prod/worker
## Copy manfiests, lockfiles, and configs into docker context
COPY package.json pnpm-lock.yaml .npmrc .
RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
# COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/
# COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/
# COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/utils/
COPY ./packages/worker/pnpm-lock.yaml ./packages/worker/package.json ./packages/worker/
## Install npm packages
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
## we install node-gyp explicitly in order for sharp to install properly
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --prefer-offline
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline
## Copy package code into docker context
# COPY ./packages/storage/ ./packages/storage/
# COPY ./packages/types/ ./packages/types/
# COPY ./packages/utils/ ./packages/utils/
COPY ./packages/worker/ ./packages/worker/
## Transpile TS into JS
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm -r build
## Copy all production code into one place
## `pnpm deploy` copies all dependencies into an isolated node_modules directory inside the target dir
## @see https://pnpm.io/cli/deploy
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/worker --prod /prod/worker
FROM base AS worker
COPY --from=build /prod/worker .
RUN ls -la .
ENTRYPOINT ["pnpm", "start"]

@ -0,0 +1,5 @@
# @futureporn/fetchers
`fetch()` wrappers for getting/setting values in our database.
distinct from @futureporn/scout which is fetching data from external sources.

@ -0,0 +1,23 @@
{
"name": "@futureporn/fetchers",
"type": "module",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 0"
},
"exports": {
"./*.ts": "./src/*.ts"
},
"keywords": [],
"author": "",
"license": "Unlicense",
"dependencies": {
"@futureporn/types": "workspace:^"
},
"devDependencies": {
"@types/chai": "^4.3.19",
"chai": "^5.1.1"
}
}

77
packages/fetchers/pnpm-lock.yaml generated Normal file

@ -0,0 +1,77 @@
lockfileVersion: '9.0'
settings:
autoInstallPeers: true
excludeLinksFromLockfile: false
importers:
.:
dependencies:
'@futureporn/types':
specifier: workspace:^
version: link:../types
devDependencies:
'@types/chai':
specifier: ^4.3.19
version: 4.3.19
chai:
specifier: ^5.1.1
version: 5.1.1
packages:
'@types/chai@4.3.19':
resolution: {integrity: sha512-2hHHvQBVE2FiSK4eN0Br6snX9MtolHaTo/batnLjlGRhoQzlCL61iVpxoqO7SfFyOw+P/pwv+0zNHzKoGWz9Cw==}
assertion-error@2.0.1:
resolution: {integrity: sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==}
engines: {node: '>=12'}
chai@5.1.1:
resolution: {integrity: sha512-pT1ZgP8rPNqUgieVaEY+ryQr6Q4HXNg8Ei9UnLUrjN4IA7dvQC5JB+/kxVcPNDHyBcc/26CXPkbNzq3qwrOEKA==}
engines: {node: '>=12'}
check-error@2.1.1:
resolution: {integrity: sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==}
engines: {node: '>= 16'}
deep-eql@5.0.2:
resolution: {integrity: sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==}
engines: {node: '>=6'}
get-func-name@2.0.2:
resolution: {integrity: sha512-8vXOvuE167CtIc3OyItco7N/dpRtBbYOsPsXCz7X/PMnlGjYjSGuZJgM1Y7mmew7BKf9BqvLX2tnOVy1BBUsxQ==}
loupe@3.1.1:
resolution: {integrity: sha512-edNu/8D5MKVfGVFRhFf8aAxiTM6Wumfz5XsaatSxlD3w4R1d/WEKUTydCdPGbl9K7QG/Ca3GnDV2sIKIpXRQcw==}
pathval@2.0.0:
resolution: {integrity: sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==}
engines: {node: '>= 14.16'}
snapshots:
'@types/chai@4.3.19': {}
assertion-error@2.0.1: {}
chai@5.1.1:
dependencies:
assertion-error: 2.0.1
check-error: 2.1.1
deep-eql: 5.0.2
loupe: 3.1.1
pathval: 2.0.0
check-error@2.1.1: {}
deep-eql@5.0.2: {}
get-func-name@2.0.2: {}
loupe@3.1.1:
dependencies:
get-func-name: 2.0.2
pathval@2.0.0: {}

@ -0,0 +1,27 @@
const requiredEnvVars = [
'POSTGREST_URL',
'AUTOMATION_USER_JWT',
'SCOUT_URL',
] as const;
const getEnvVar = (key: typeof requiredEnvVars[number]): string => {
const value = process.env[key];
if (!value) {
throw new Error(`Missing ${key} env var`);
}
return value;
};
export interface Config {
postgrestUrl: string;
automationUserJwt: string;
scoutUrl: string;
}
export const configs: Config = {
scoutUrl: getEnvVar('SCOUT_URL'),
postgrestUrl: getEnvVar('POSTGREST_URL'),
automationUserJwt: getEnvVar('AUTOMATION_USER_JWT')
}

@ -1,4 +1,4 @@
import { configs } from "../config"
import { configs } from "../../../services/factory/src/config"
import type { S3FileRecord, S3FileResponse } from '@futureporn/types'
export default async function createS3File(s3File?: S3FileRecord): Promise<S3FileResponse|null> {

@ -1,15 +1,14 @@
import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts'
import { configs } from './config.ts'
import querystring from 'node:querystring'
export default async function createSegmentInDatabase(s3_key: string, vod_id: string, helpers: Helpers): Promise<string> {
export default async function createSegmentInDatabase(s3_key: string, vod_id: string): Promise<string> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key,
vod_id
}
helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
helpers.logger.info(JSON.stringify(segmentPayload))
// helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
// helpers.logger.info(JSON.stringify(segmentPayload))
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
@ -23,7 +22,7 @@ export default async function createSegmentInDatabase(s3_key: string, vod_id: st
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
helpers.logger.error(msg)
console.error(msg)
throw new Error(msg);
}
const location = res.headers.get('location')

@ -1,9 +1,9 @@
import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts'
// import type { Helpers } from 'graphile-worker'
import { configs } from '../../../services/capture/src/config.ts'
import querystring from 'node:querystring'
export default async function createSegmentsVodLink(vod_id: string, segment_id: string, helpers: Helpers): Promise<number> {
helpers.logger.info(`createSegmentsVodLink with vod_id=${vod_id}, segment_id=${segment_id}`)
export default async function createSegmentsVodLink(vod_id: string, segment_id: string): Promise<string> {
console.info(`createSegmentsVodLink with vod_id=${vod_id}, segment_id=${segment_id}`)
if (!vod_id) throw new Error('createSegmentsVodLink requires {string} vod_id as first arg');
if (!segment_id) throw new Error('createSegmentsVodLink requires {string} segment_id as second arg');
const segmentVodLinkPayload = {
@ -32,5 +32,5 @@ export default async function createSegmentsVodLink(vod_id: string, segment_id:
if (Array.isArray(segmentsId)) throw new Error('segments_vod_links was an array which is unexpected');
const id = segmentsId.split('.').at(-1)
if (!id) throw new Error('failed to get id ');
return parseInt(id)
return id
}

@ -1,4 +1,4 @@
import { configs } from '../config.ts'
import { configs } from '../../../services/bot/src/config.ts'
export default async function createStreamInDatabase(url: string, discordMessageId: string): Promise<string> {
const streamPayload = {

@ -1,7 +1,7 @@
import { configs } from "../config"
import type { Vod, Stream } from '@futureporn/types'
import { configs } from "../../../services/factory/src/config"
import type { VodResponse, Stream } from '@futureporn/types'
export default async function createVod(stream?: Stream): Promise<Vod|null> {
export default async function createVod(stream?: Stream): Promise<VodResponse|null> {
if (!stream) throw new Error(`first argument passed to createVod must be a {Stream}`);
console.log(stream)
const url = `${configs.postgrestUrl}/vods`
@ -26,7 +26,7 @@ export default async function createVod(stream?: Stream): Promise<Vod|null> {
const body = await res.json()
throw new Error(`Problem during createVod. res.status=${res.status}, res.statusText=${res.statusText}, body=${JSON.stringify(body)}`)
}
const json = await res.json() as Vod[]
const json = await res.json() as VodResponse[]
const vod = json[0]
if (!vod) return null
else return vod

@ -1,7 +1,7 @@
import { configs } from "../config.ts"
import { configs } from "../../../services/bot/src/config.ts"
import type { Stream, VodRecord } from "@futureporn/types"
import { sub } from 'date-fns'
import { bot } from "../bot.ts"
import { bot } from "../../../services/bot/src/bot.ts"
export async function findStream(vtuberId: string, lteDate: Date, gteDate: Date): Promise<string|null> {
const fetchUrl = `${configs.postgrestUrl}/streams?select=id&vtuber=eq.${vtuberId}&date=gte.${gteDate.toISOString()}&date=lte.${lteDate.toISOString()}`

@ -1,6 +1,6 @@
import { configs } from "../config.ts"
import { configs } from "../../../services/bot/src/config.ts"
import type { VtuberRecord, VtuberResponse } from "@futureporn/types"
import { bot } from '../bot.ts'
import { bot } from '../../../services/bot/src/bot.ts'
import qs from 'qs'
interface vTuberSearchQuery {

@ -1,12 +1,11 @@
import type { VodResponse } from "@futureporn/types";
import { bot } from "../bot.ts";
import { configs } from "../config.ts";
import { configs } from "./config.ts";
export default async function findVod({ vod_id, discord_message_id }: { vod_id?: string, discord_message_id?: string }): Promise<VodResponse|null> {
const fetchUrl = (!!vod_id)
? `${configs.postgrestUrl}/vods?id=eq.${vod_id}&select=*,segments(bytes,updated_at,created_at)`
? `${configs.postgrestUrl}/vods?id=eq.${vod_id}&select=*,segments(bytes,updated_at,created_at,s3_key)`
: `${configs.postgrestUrl}/vods?discord_message_id=eq.${discord_message_id}&select=*,segments(bytes,updated_at,created_at)`
bot.logger.info(fetchUrl)
console.info(fetchUrl)
const fetchOptions = {
method: 'GET',
headers: {
@ -18,11 +17,11 @@ export default async function findVod({ vod_id, discord_message_id }: { vod_id?:
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
bot.logger.error(msg)
console.error(msg)
throw new Error(msg)
}
const json = await res.json() as VodResponse[]
// bot.logger.info(`vod results as follows.`)
// bot.logger.info(json)
// console.info(`vod results as follows.`)
// console.info(json)
return json?.at(0) || null
}

@ -1,4 +1,4 @@
import { configs } from '../config.ts'
import { configs } from '../../../services/capture/src/config.ts'
export default async function getPlaylistUrl (url: string): Promise<string|null> {
if (!url) throw new Error(`getPlaylistUrl requires a url, but it was undefined.`);

@ -1,15 +1,13 @@
import type { Segment } from '@futureporn/types'
import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts'
import { configs } from '../../../services/capture/src/config.ts'
import querystring from 'node:querystring'
export default async function getSegmentsFromDatabase(s3_key: string, helpers: Helpers): Promise<number> {
export default async function getSegmentsFromDatabase(s3_key: string): Promise<number> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = {
s3_key
}
helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
helpers.logger.info(JSON.stringify(segmentPayload))
console.info(`Creating segment with s3_key=${s3_key}. payload as follows`)
console.info(JSON.stringify(segmentPayload))
const res = await fetch(`${configs.postgrestUrl}/segments`, {
method: 'POST',
headers: {
@ -23,7 +21,7 @@ export default async function getSegmentsFromDatabase(s3_key: string, helpers: H
if (!res.ok) {
const body = await res.text()
const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}`
helpers.logger.error(msg)
console.error(msg)
throw new Error(msg);
}
const location = res.headers.get('location')

@ -1,6 +1,6 @@
import { configs } from "../config.ts"
import { configs } from "../../../services/bot/src/config.ts"
import type { Stream } from '@futureporn/types'
import { bot } from '../bot.ts'
import { bot } from '../../../services/bot/src/bot.ts'
export default async function getStreamFromDatabase(messageId: string): Promise<Stream|null> {

@ -1,5 +1,5 @@
import { type Interaction } from "discordeno"
import { configs } from "../config.ts"
import { configs } from "../../../services/bot/src/config.ts"
import { type Stream } from "@futureporn/types"
import { logger } from "@discordeno/bot"

@ -1,9 +1,7 @@
import { configs } from "../config"
import { Helpers } from "graphile-worker"
import { Stream } from "@futureporn/types"
import { configs } from "./config"
import type { VodResponse } from "@futureporn/types"
export default async function getVod(vodId: string, helpers: Helpers) {
export default async function getVod(vodId: string) {
const url = `${configs.postgrestUrl}/vods?select=*,segments(*)&id=eq.${vodId}`
try {
const res = await fetch(url)
@ -14,11 +12,11 @@ export default async function getVod(vodId: string, helpers: Helpers) {
if (!body[0]) throw new Error('body[0] was expected to be Vod data, but it was either null or undefined.');
return body[0];
} catch (e) {
helpers.logger.error(`encountered an error during getVod()`)
console.error(`encountered an error during getVod()`)
if (e instanceof Error) {
helpers.logger.error(e.message)
console.error(e.message)
} else {
helpers.logger.error(JSON.stringify(e))
console.error(JSON.stringify(e))
}
return null
}

@ -1,5 +1,5 @@
import type { Stream } from "@futureporn/types";
import { configs } from "../config.ts";
import { configs } from "../../../services/bot/src/config.ts";
import { logger } from "@discordeno/bot";
export default async function patchVod(stream_id: string, payload: Partial<Stream>): Promise<void> {

@ -1,5 +1,5 @@
import type { VodRecord } from "@futureporn/types";
import { configs } from "../config.ts";
import { configs } from "../../../services/factory/src/config.ts";
export default async function patchVodInDatabase(vod_id: string, payload: Partial<VodRecord>): Promise<void> {

@ -1,6 +1,5 @@
import type { SegmentResponse } from '@futureporn/types'
import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts'
import { configs } from '../../../services/capture/src/config.ts'
/**
@ -13,11 +12,9 @@ import { configs } from '../config.ts'
export default async function updateSegmentInDatabase({
segment_id,
fileSize,
helpers
}: {
segment_id: string,
fileSize: number,
helpers: Helpers
}): Promise<SegmentResponse> {
const payload: any = {
@ -25,7 +22,7 @@ export default async function updateSegmentInDatabase({
}
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)`
// helpers.logger.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
const res = await fetch(fetchUrl, {
method: 'PATCH',
headers: {
@ -39,14 +36,14 @@ export default async function updateSegmentInDatabase({
if (!res.ok) {
const body = await res.text()
const msg = `failed to updateSegmentInDatabase. status=${res.status}, statusText=${res.statusText}, body=${body}`
helpers.logger.error(msg)
console.error(msg)
throw new Error(msg);
}
// helpers.logger.info(`response was OK~`)
// console.info(`response was OK~`)
const body = await res.json() as SegmentResponse[];
if (!body[0]) throw new Error(`failed to get a segment that matched segment_id=${segment_id}`);
const bod = body[0]
// helpers.logger.info('the following was the response from PATCH-ing /segments')
// helpers.logger.info(JSON.stringify(bod))
// console.info('the following was the response from PATCH-ing /segments')
// console.info(JSON.stringify(bod))
return bod
}

@ -16,7 +16,7 @@
"author": "@CJ_Clippy",
"license": "Unlicense",
"dependencies": {
"@aws-sdk/client-s3": "^3.637.0",
"@aws-sdk/client-s3": "3.637.0",
"@aws-sdk/lib-storage": "^3.637.0",
"@futureporn/types": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2",

@ -9,7 +9,7 @@ importers:
.:
dependencies:
'@aws-sdk/client-s3':
specifier: ^3.637.0
specifier: 3.637.0
version: 3.637.0
'@aws-sdk/lib-storage':
specifier: ^3.637.0

@ -144,12 +144,12 @@ export interface VodResponse {
title?: string;
date: string;
mux_asset: MuxAssetRecord;
thumbnail?: S3File;
thumbnail?: S3FileResponse;
vtuber: VtuberRecord;
tags?: TagRecord[];
timestamps?: Timestamp[];
ipfs_cid?: string;
s3_file?: S3File;
s3_file?: S3FileResponse;
torrent?: string;
announce_title?: string;
announce_url?: string;
@ -174,7 +174,7 @@ export interface Stream {
updated_at: Date;
vtuber: string;
tweet: string;
vods?: Vod[];
vods?: VodResponse[];
archive_status: ArchiveStatus;
is_chaturbate_stream: Boolean;
is_fansly_stream: Boolean;

@ -7,4 +7,6 @@ export function fpSlugify(str: string): string {
locale: 'en',
trim: true,
});
}
}
export const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'

@ -15,4 +15,4 @@
## every 1 minutes, we see which /vods are stale and we mark them as such.
## this prevents stalled Record updates by marking stalled recordings as stopped
* * * * * update_vod_statuses ?max=1 { stalled_minutes:1, finished_minutes:2 }
* * * * * update_vod_statuses ?max=1 { stalled_minutes:2, finished_minutes:3 }

@ -25,6 +25,7 @@
"dependencies": {
"@discordeno/bot": "19.0.0-next.b1bfe94",
"@discordeno/rest": "19.0.0-next.b3a8c86",
"@futureporn/fetchers": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2",
"@types/node": "^22.5.2",
"@types/qs": "^6.9.15",

@ -14,6 +14,9 @@ importers:
'@discordeno/rest':
specifier: 19.0.0-next.b3a8c86
version: 19.0.0-next.b3a8c86
'@futureporn/fetchers':
specifier: workspace:^
version: link:../../packages/fetchers
'@paralleldrive/cuid2':
specifier: ^2.2.2
version: 2.2.2

@ -1,11 +1,11 @@
import { ApplicationCommandTypes, logger, type Interaction } from '@discordeno/bot'
import { createCommand } from '../commands.ts'
import getStreamFromDatabase from '../fetchers/getStreamFromDatabase.ts'
import patchVod from '../fetchers/patchVod.ts'
import getStreamFromDatabase from '@futureporn/fetchers/getStreamFromDatabase.ts'
import patchVod from '@futureporn/fetchers/patchVod.ts'
import { quickAddJob, type WorkerUtilsOptions } from 'graphile-worker'
import { configs } from '../config.ts'
import findVod from '../fetchers/findVod.ts'
import findVod from '@futureporn/fetchers/findVod.ts'
function throwErr(msg: string) {
logger.error(msg)

@ -11,10 +11,10 @@ import { rbacAllow } from '../middlewares/rbac.ts'
import { createCommand } from '../commands.ts'
import { configs } from '../config.ts'
import type { Stream } from '@futureporn/types'
import createStreamInDatabase from '../fetchers/createStreamInDatabase.ts'
import createVod from '../fetchers/createVod.ts'
import findOrCreateVtuber from '../fetchers/findOrCreateVtuber.ts'
import findOrCreateStream from '../fetchers/findOrCreateStream.ts'
import createStreamInDatabase from '@futureporn/fetchers/createStreamInDatabase.ts'
import createVod from '@futureporn/fetchers/createVod.ts'
import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts'
import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts'
/**

@ -1,28 +0,0 @@
import { configs } from "../config.ts"
import type { VodRecord } from "@futureporn/types"
export default async function createVod(payload: Partial<VodRecord>): Promise<string> {
const vodPayload = {
date: new Date().toISOString()
}
const res = await fetch(`${configs.postgrestUrl}/vods`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Prefer': 'return=headers-only',
'Authorization': `Bearer ${configs.automationUserJwt}`,
},
body: JSON.stringify(Object.assign(vodPayload, payload))
})
if (!res.ok) {
const status = res.status
const statusText = res.statusText
const body = await res.text()
const msg = `Failed to create vod in database. status=${status}, statusText=${statusText}, body=${body}`
console.error(msg)
throw new Error(msg)
}
const id = res.headers.get('location')?.split('.').at(-1)
if (!id) throw new Error('id could not be parsed from location header');
return id
}

@ -1,7 +1,7 @@
import 'dotenv/config'
import type { Status, Stream, SegmentResponse, VodRecord, VodResponse } from '@futureporn/types'
import type { Status, SegmentResponse, VodResponse } from '@futureporn/types'
import { type Task, type Helpers } from 'graphile-worker'
import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns'
import { intervalToDuration, formatDuration } from 'date-fns'
import prettyBytes from 'pretty-bytes'
import {
EmbedsBuilder,
@ -13,7 +13,7 @@ import {
} from '@discordeno/bot'
import { bot } from '../bot.ts'
import { configs } from '../config.ts'
import findVod from '../fetchers/findVod.ts'
import findVod from '@futureporn/fetchers/findVod.ts'
const yeahEmojiId = BigInt('1253191939461873756')
@ -89,6 +89,7 @@ function getEmbeds(vod: VodResponse, helpers: Helpers) {
{ name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true },
// { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo
{ name: 'URL', value: url, inline: false },
{ name: 'CDN2', value: '@todo', inline: false },
])
if (status === 'pending_recording') {
embeds
@ -132,12 +133,14 @@ function getEmbeds(vod: VodResponse, helpers: Helpers) {
.setFields(segments.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()).map((s, i) => (
{
name: `Segment ${i+1}`,
value: `${getDuration(s)} (${prettyBytes(s.bytes)})`,
value: `${getDuration(s)} (${prettyBytes(s.bytes)}) (${s.s3_key})`,
inline: false
}
)))
}
// Add an embed for S3 files
// Add an Embed for processing tasks
if (status === 'processing') {
const tasks = [

@ -0,0 +1,11 @@
import { updateStalledVods } from "./update_vod_statuses.ts"
describe('update_vod_statuses', function () {
describe('updateStalledVods', function () {
describe('integration', function () {
xit("Should fetch a list of VODs that haven't been updated in the past 2 minutes", function () {
// const
})
})
})
})

@ -18,7 +18,7 @@ function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload.finished_minutes !== 'number') throw new Error(`finished_minutes parameter was not a number`);
}
async function updateFinishedVods({
export async function updateFinishedVods({
helpers,
finished_minutes,
url
@ -60,21 +60,22 @@ async function updateFinishedVods({
}
}
async function updateStalledVods({
export async function updateStalledVods({
helpers,
url,
stalled_minutes = 1,
}: {
helpers: Helpers,
url: string,
stalled_minutes?: number,
stalled_minutes: number,
}) {
// Identify and update stalled vods
// note: we are checking /vods update_at which gets updated whenever a /segments gets updated (thanks to postgres trigger functions)
const stalledTimestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString();
const stalledQueryOptions = {
select: 'status,id,segments!inner(updated_at)',
'segments.updated_at': `lt.${stalledTimestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)',
'select': 'status,id',
'updated_at': `lt.${stalledTimestamp}`,
'or': '(status.eq.pending_recording,status.eq.recording)',
};
const stalledUpdatePayload = {
status: 'stalled',
@ -100,6 +101,9 @@ async function updateStalledVods({
helpers.logger.error(stalledBody);
return;
}
// const stalledResHeaders = stalledRes.headers.get('')
// console.log(stalledRes)
// console.log(stalledResBody)
}
export const update_vod_statuses: Task = async function (payload: unknown, helpers: Helpers) {

@ -25,7 +25,7 @@
// Include the necessary files for your project
"include": [
"**/*.ts"
, "src/events/interactionCreate.ts.old" ],
, "src/events/interactionCreate.ts.old", "../../packages/fetchers/createStreamInDatabase.ts", "../../packages/fetchers/createVod.ts", "../../packages/fetchers/findOrCreateStream.ts", "../../packages/fetchers/findOrCreateVtuber.spec.ts", "../../packages/fetchers/findOrCreateVtuber.ts", "../../packages/fetchers/findVod.ts", "../../packages/fetchers/getStreamFromDatabase.ts", "../../packages/fetchers/getStreamIdFromMessage.ts", "../../packages/fetchers/getUrlFromMessage.ts", "../../packages/fetchers/patchVod.ts" ],
"exclude": [
"node_modules"
]

@ -4,7 +4,7 @@
"version": "0.3.5",
"license": "Unlicense",
"private": true,
"packageManager": "pnpm@9.5.0",
"packageManager": "pnpm@9.6.0",
"scripts": {
"start": "node dist/index.js",
"build": "tsup",
@ -21,6 +21,7 @@
"@aws-sdk/client-s3": "^3.637.0",
"@aws-sdk/lib-storage": "^3.637.0",
"@aws-sdk/types": "^3.609.0",
"@futureporn/fetchers": "workspace:^",
"@futureporn/types": "workspace:^",
"@futureporn/utils": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2",
@ -46,6 +47,7 @@
"https": "^1.0.0",
"ioredis": "^5.4.1",
"minimatch": "^10.0.1",
"nanoid": "^5.0.7",
"p-retry": "^6.2.0",
"pg-boss": "^10.1.1",
"pino-pretty": "^11.2.2",

@ -17,6 +17,9 @@ importers:
'@aws-sdk/types':
specifier: ^3.609.0
version: 3.609.0
'@futureporn/fetchers':
specifier: workspace:^
version: link:../../packages/fetchers
'@futureporn/types':
specifier: workspace:^
version: link:../../packages/types
@ -92,6 +95,9 @@ importers:
minimatch:
specifier: ^10.0.1
version: 10.0.1
nanoid:
specifier: ^5.0.7
version: 5.0.7
p-retry:
specifier: ^6.2.0
version: 6.2.0
@ -1951,6 +1957,11 @@ packages:
nan@2.20.0:
resolution: {integrity: sha512-bk3gXBZDGILuuo/6sKtr0DQmSThYHLtNCdSdXk9YkxD/jK6X2vmCyyXBBxyqZ4XcnzTyYEAThfX3DCEnLf6igw==}
nanoid@5.0.7:
resolution: {integrity: sha512-oLxFY2gd2IqnjcYyOXD8XGCftpGtZP2AbHbOkthDkvRywH5ayNtPVy9YlOPcHckXzbLTCHpkb7FB+yuxKV13pQ==}
engines: {node: ^18 || >=20}
hasBin: true
neotraverse@0.6.18:
resolution: {integrity: sha512-Z4SmBUweYa09+o6pG+eASabEpP6QkQ70yHj351pQoEXIs8uHbaU2DWVmzBANKgflPa47A50PtB2+NgRpQvr7vA==}
engines: {node: '>= 10'}
@ -4996,6 +5007,8 @@ snapshots:
nan@2.20.0: {}
nanoid@5.0.7: {}
neotraverse@0.6.18: {}
nise@5.1.9:

@ -1,10 +1,15 @@
import { spawn } from 'child_process';
import { EventEmitter, PassThrough, pipeline, Readable } from 'stream';
import prettyBytes from 'pretty-bytes';
import { PassThrough, pipeline, Readable } from 'stream';
import EventEmitter from 'events';
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import { CompleteMultipartUploadCommand, S3Client } from "@aws-sdk/client-s3";
import 'dotenv/config'
import { promisify } from 'util';
// @see https://nodejs.org/api/events.html#capture-rejections-of-promises
EventEmitter.captureRejections = true;
const pipelinePromise = promisify(pipeline)
const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'
export class UploadStreamClosedError extends Error {
@ -49,6 +54,7 @@ export default class Record {
date?: string;
abortSignal: AbortSignal;
onProgress: Function;
upload?: Upload;
constructor({ inputStream, s3Client, bucket, jobId, abortSignal, onProgress }: RecordArgs) {
if (!inputStream) throw new Error('Record constructor was missing inputStream.');
@ -67,6 +73,7 @@ export default class Record {
this.uploadStream = new PassThrough()
this.abortSignal = abortSignal
this.abortSignal.addEventListener("abort", this.abortEventListener.bind(this))
this.upload
}
@ -127,7 +134,7 @@ export default class Record {
// greets https://stackoverflow.com/a/70159394/1004931
try {
const parallelUploads3 = new Upload({
this.upload = new Upload({
client: this.s3Client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
@ -135,8 +142,7 @@ export default class Record {
params: target,
});
parallelUploads3.on("httpUploadProgress", (progress) => {
this.upload.on("httpUploadProgress", (progress) => {
if (progress?.loaded) {
// console.log(progress)
if (this.onProgress) this.onProgress(this.counter);
@ -146,15 +152,41 @@ export default class Record {
}
});
console.log('Waiting for parallelUploads3 to finish...')
await parallelUploads3.done();
console.log('parallelUploads3 is complete.')
console.log(`Uploading to bucket=${this.bucket}. Waiting for parallelUploads3 to finish...`)
// @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
// when the download stream closes before the upload stream, I think the upload stream gets cut off.
// this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
// is that right? IDK what's happening, but we don't get any segment data on S3 at all??
// Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
// it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
// So I think what is happening is the upload is happening, but it's not finishing.
// It looks like the finish is only allowed to happen under completely normal circumstances.
// However, the segment upload may fail in production, and we need to let the upload finish even then.
//
// I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
// Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
// But because of the catch, that promise will never resolve?
// What happens to an in-progress Promise when an error is thrown?
// await this.upload.done();
// console.log('Upload is complete.')
// return this.uploadStream
} catch (e) {
// if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error.
// so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted.
// in other words, `(e.name === 'AbortError')` will never be true.
if (this.abortSignal.aborted) return;
if (this.abortSignal.aborted) {
console.error('While uploading, the upload was aborted.')
setTimeout(async () => {
await this.upload?.abort()
}, 1000)
// if (this.upload) {
// const command = new CompleteMultipartUploadCommand()
// }
return;
}
if (e instanceof Error) {
console.error(`We were uploading a file to S3 but then we encountered an exception!`)
@ -191,10 +223,6 @@ export default class Record {
console.error('there was an error on the uploadStream. error as follows')
console.error(e)
})
// T.M.I.
// this.uploadStream.on('drain', () => {
// console.info('[vvv] drain on uploadStream.')
// })
// input stream event handlers
this.inputStream.on('close', () => {
@ -211,22 +239,20 @@ export default class Record {
// pipe the ffmpeg stream to the S3 upload stream
// this has the effect of uploading the stream to S3 at the same time we're recording it.
pipeline(
console.log(`>>> awaiting pipeline promise`)
const streamPromise = pipelinePromise(
this.inputStream,
this.uploadStream,
(err) => {
if (err) {
console.error(`pipeline errored.`)
console.error(err)
} else {
console.log('pipeline succeeded.')
}
}
this.uploadStream
)
console.log('awaiting uploadToS3()...')
await this.uploadToS3()
console.log('uploadToS3() is complete.')
this.uploadToS3()
await Promise.all([streamPromise, this.upload?.done()])
// console.log('awaiting uploadToS3()...')
// await this.uploadToS3()
// console.log('uploadToS3() is complete.')
console.log(`streamPromise completed with jobId=${this.jobId}, keyName=${this.keyName}`)
return {
jobId: this.jobId,

@ -0,0 +1,15 @@
import RecordNextGeneration from './RecordNextGeneration.ts'
import getVod from '@futureporn/fetchers/getVod.ts'
import createVod from '@futureporn/fetchers/createVod.ts'
describe('RecordNextGeneration', function () {
describe('integration', function () {
it('should stream to S3', async function () {
this.timeout(30000)
const cv = await createVod()
if (!cv) throw new Error('failed to get vod from createVod()');
const recordNG = new RecordNextGeneration({ url: 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-06-15.mp4', vodId: cv.id })
const uv = await getVod(cv.id)
})
})
})

@ -0,0 +1,213 @@
import { VodResponse } from "@futureporn/types"
import getVod from '@futureporn/fetchers/getVod.ts'
import { PassThrough, Readable } from "stream"
import { ua0 } from '@futureporn/utils/name.ts'
import { spawn } from 'child_process'
import { pipeline } from "stream/promises"
import { configs } from "./config"
import { nanoid } from 'nanoid'
import { Upload } from "@aws-sdk/lib-storage"
import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
export interface RecordNextGenerationArguments {
vodId: string;
url: string;
}
export default class RecordNextGeneration {
public vodId: string;
public url: string;
private vod?: VodResponse|null;
private downloadStream?: Readable;
private uploadStream?: PassThrough;
private upload?: Upload;
private streamPipeline?: Promise<void>;
constructor({ vodId, url }: RecordNextGenerationArguments) {
this.vodId = vodId
this.url = url
// const outputStream = createWriteStream('/dev/null')
// setInterval(() => { inputStream.push('simulated downloader bytes received') }, 50)
// setTimeout(() => { inputStream.destroy() })
}
static getMultipartUpload({ client, bucket, key, body }: { client: S3Client, bucket: string, key: string, body: Readable }) {
const params = {
Bucket: bucket,
Key: key,
Body: body
}
const upload = new Upload({
client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
leavePartsOnError: true,
params
})
upload.on("httpUploadProgress", (progress) => {
if (progress?.loaded) {
console.log(progress)
// if (this.onProgress) this.onProgress(this.counter);
// console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`);
} else {
console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`)
}
});
return upload
}
// static deleteme () {
// // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
// // when the download stream closes before the upload stream, I think the upload stream gets cut off.
// // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
// // is that right? IDK what's happening, but we don't get any segment data on S3 at all??
// // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but
// // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far.
// // So I think what is happening is the upload is happening, but it's not finishing.
// // It looks like the finish is only allowed to happen under completely normal circumstances.
// // However, the segment upload may fail in production, and we need to let the upload finish even then.
// //
// // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
// // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput.
// // But because of the catch, that promise will never resolve?
// // What happens to an in-progress Promise when an error is thrown?
// // await this.upload.done();
// // console.log('Upload is complete.')
// // return this.uploadStream
// } catch (e) {
// // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error.
// // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted.
// // in other words, `(e.name === 'AbortError')` will never be true.
// if (this.abortSignal.aborted) {
// console.error('While uploading, the upload was aborted.')
// setTimeout(async () => {
// await this.upload?.abort()
// }, 1000)
// // if (this.upload) {
// // const command = new CompleteMultipartUploadCommand()
// // }
// return;
// }
// if (e instanceof Error) {
// console.error(`We were uploading a file to S3 but then we encountered an exception!`)
// console.error(e)
// throw e
// } else {
// throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
// }
// }
// }
static getFFmpegStream({ url }: { url: string }): Readable {
console.log(`getFFmpegStream using url=${url}`)
const ffmpegProc = spawn('ffmpeg', [
'-headers', `"User-Agent: ${ua0}"`,
'-i', url,
'-c:v', 'copy',
'-c:a', 'copy',
'-movflags', 'faststart',
'-y',
'-f', 'mpegts',
'-loglevel', 'quiet',
'pipe:1'
], {
// ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang
stdio: ['pipe', 'pipe', 'ignore']
})
return ffmpegProc.stdout
}
/**
* done() waits for the recording to be complete.
*/
async done() {
// @todo [x] get the vod
// @todo [ ] get the download stream
// @todo [ ] pipe the download stream to the upload stream
// @todo [ ] start the upload(?)
// @todo [ ] await all the promised resolutions
// @todo [ ] handle errors
// @todo [ ] cleanup
// @todo - [ ] send S3 complete upload command if necessary
try {
this.vod = await getVod(this.vodId)
if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`)
if (this.vod.is_recording_aborted) {
console.info(`vod ${this.vodId} is aborted.`)
return
}
const clientOptions: S3ClientConfig = {
endpoint: configs.s3Endpoint,
region: configs.s3Region,
credentials: {
accessKeyId: configs.s3AccessKeyId,
secretAccessKey: configs.s3SecretAccessKey,
},
}
const client = new S3Client(clientOptions)
const bucket = configs.s3UscBucket
const key = `@todo-${nanoid()}`
this.downloadStream = RecordNextGeneration.getFFmpegStream({ url: this.url })
this.uploadStream = new PassThrough()
this.upload = RecordNextGeneration.getMultipartUpload({ client, bucket: configs.s3UscBucket, key, body: this.downloadStream })
this.streamPipeline = pipeline(this.downloadStream, this.uploadStream)
return Promise.all([
this.streamPipeline,
this.upload.done()
])
} catch (e) {
switch (e) {
case (e instanceof Error && e.name === 'RoomOfflineError'):
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
throw e
case (e instanceof Error && e.name === 'AbortError'):
// An admin aborted which means we don't want to retry.
// we return and the Task gets marked as successful.
return
default:
console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? e.message : JSON.stringify(e))
}
} finally {
// @todo cleanup
console.info(`finally block is running now.`)
console.info('@todo cleanup')
}
}
}

@ -1,11 +1,9 @@
'use strict'
import { build } from './app.ts'
import chai, { expect } from "chai"
import { use, expect } from "chai"
import sinonChai from 'sinon-chai'
import sinon from 'sinon'
import { makeWorkerUtils } from 'graphile-worker'
chai.use(sinonChai)
use(sinonChai)
describe('app', function () {
const app = build({}, 'postgres://')

@ -6,7 +6,7 @@ const requiredEnvVars = [
'S3_SECRET_ACCESS_KEY',
'S3_REGION',
'S3_ENDPOINT',
'S3_BUCKET',
'S3_USC_BUCKET',
'POSTGREST_URL',
'AUTOMATION_USER_JWT',
] as const;
@ -26,7 +26,7 @@ export interface Config {
s3AccessKeyId: string;
s3SecretAccessKey: string;
s3Region: string;
s3Bucket: string;
s3UscBucket: string;
s3Endpoint: string;
}
@ -38,6 +38,6 @@ export const configs: Config = {
s3AccessKeyId: getEnvVar('S3_ACCESS_KEY_ID'),
s3SecretAccessKey: getEnvVar('S3_SECRET_ACCESS_KEY'),
s3Region: getEnvVar('S3_REGION'),
s3Bucket: getEnvVar('S3_BUCKET'),
s3UscBucket: getEnvVar('S3_USC_BUCKET'),
s3Endpoint: getEnvVar('S3_ENDPOINT'),
}

@ -49,7 +49,6 @@ async function main() {
const playlistUrl: string = await new Promise((resolve, reject) => {
// get the m3u8 playlist for the livestream
const ytdlp = spawn('yt-dlp', [ '-g', randomRoom.url ])
let output = ''

@ -1,15 +1,16 @@
import updateSegmentInDatabase from '../fetchers/updateSegmentInDatabase.ts'
import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts'
import { Helpers, type Task } from 'graphile-worker'
import Record from '../Record.ts'
import type { SegmentResponse, ScoutResponse } from '@futureporn/types'
import type { SegmentResponse } from '@futureporn/types'
import { configs } from '../config.ts'
import { createId } from '@paralleldrive/cuid2'
import createSegmentInDatabase from '../fetchers/createSegmentInDatabase.ts'
import createSegmentsVodLink from '../fetchers/createSegmentsVodLink.ts'
import getPlaylistUrl from '../fetchers/getPlaylistUrl.ts'
import { String } from 'aws-sdk/clients/acm'
import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts'
import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts'
import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts'
import getVod from '@futureporn/fetchers/getVod.ts'
import RecordNextGeneration from '../RecordNextGeneration.ts'
/**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
@ -30,40 +31,39 @@ function assertPayload(payload: any): asserts payload is Payload {
}
async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
const abortController = new AbortController()
const abortSignal = abortController.signal
const accessKeyId = configs.s3AccessKeyId;
const secretAccessKey = configs.s3SecretAccessKey;
const region = configs.s3Region;
const endpoint = configs.s3Endpoint;
const bucket = configs.s3Bucket;
const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
helpers.logger.info(`playlistUrl=${playlistUrl}`)
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const onProgress = (fileSize: number) => {
updateSegmentInDatabase({ segment_id, fileSize, helpers })
.then(checkIfAborted)
.then((isAborted) => {
isAborted ? abortController.abort() : null
})
.catch((e) => {
helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance')
helpers.logger.error(e)
})
}
const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal })
return record
}
// async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
// helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
// const abortController = new AbortController()
// const abortSignal = abortController.signal
// const accessKeyId = configs.s3AccessKeyId;
// const secretAccessKey = configs.s3SecretAccessKey;
// const region = configs.s3Region;
// const endpoint = configs.s3Endpoint;
// const bucket = configs.s3UscBucket;
// const playlistUrl = await getPlaylistUrl(url)
// if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
// helpers.logger.info(`playlistUrl=${playlistUrl}`)
// const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
// const inputStream = Record.getFFmpegStream({ url: playlistUrl })
// const onProgress = (fileSize: number) => {
// updateSegmentInDatabase({ segment_id, fileSize, helpers })
// .then(checkIfAborted)
// .then((isAborted) => {
// isAborted ? abortController.abort() : null
// })
// .catch((e) => {
// helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance')
// helpers.logger.error(e)
// })
// }
function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
// console.log(`checkIfAborted with following segment`)
// console.log(segment)
return (!!segment?.vod?.is_recording_aborted)
}
// const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal })
// return record
// }
// function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
// return (!!segment?.vod?.is_recording_aborted)
// }
@ -79,16 +79,19 @@ function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
*
* This function also names the S3 file (s3_key) with a datestamp and a cuid.
*/
const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
const s3_key = `${new Date().toISOString()}-${createId()}.ts`
helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
const segment_id = await createSegmentInDatabase(s3_key, vod_id, helpers)
helpers.logger.info(`let's create a segmentsStreamLink...`)
const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id, helpers)
helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
const record = await getRecordInstance(url, segment_id, helpers)
return record.start()
}
// const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
// const s3_key = `${new Date().toISOString()}-${createId()}.ts`
// helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
// const segment_id = await createSegmentInDatabase(s3_key, vod_id)
// helpers.logger.info(`let's create a segmentsStreamLink...`)
// const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id)
// helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
// // no try-catch block here, because we need any errors to bubble up.
// const record = await getRecordInstance(url, segment_id)
// helpers.logger.info(`we got a Record instance. now we record.start()`)
// // console.log(record)
// return record.start()
// }
@ -98,42 +101,60 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
assertPayload(payload)
const { url, vod_id } = payload
const vodId = vod_id
try {
/**
* We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
* This will take effect only when Record() throws an error.
* If however Record() returns, as is the case when the stream ends, this backoff timer will not retry.
* This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops.
*
* @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
*/
// await backOff(() => doRecordSegment(url, recordId, helpers))
await doRecordSegment(url, vodId, helpers)
} catch (e) {
// await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' })
helpers.logger.error(`caught an error during record Task`)
if (e instanceof Error) {
helpers.logger.info(`error.name=${e.name}`)
if (e.name === 'RoomOfflineError') {
// If room is offline, we want to retry until graphile-worker retries expire.
// We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
// graphile-worker will retry when we re-throw the error below.
helpers.logger.info(`Room is offline.`)
} else if (e.name === 'AbortError') {
// If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
// We swallow the error and return in order to mark the job as succeeded.
helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
return
} else {
helpers.logger.error(e.message)
}
} else {
helpers.logger.error(JSON.stringify(e))
}
// we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
helpers.logger.error(`we got an error during record Task so we throw and retry`)
throw e
}
/**
* RecordNextGeneration handles errors for us and re-throws ones that should fail the Task.
* We intentionally do not use a try/catch block here.
*/
const recordNG = new RecordNextGeneration({ url, vodId })
await recordNG.done()
return;
// try {
// // if the VOD has been aborted, end Task with success
// if ((await getVod(vod_id, helpers))?.is_recording_aborted) return;
// /**
// * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
// * This will take effect only when Record() throws an error.
// * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry.
// * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops.
// *
// * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
// */
// // await backOff(() => doRecordSegment(url, recordId, helpers))
// await doRecordSegment(url, vodId, helpers)
// } catch (e) {
// // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' })
// helpers.logger.error(`caught an error during record Task`)
// if (e instanceof Error) {
// helpers.logger.info(`error.name=${e.name}`)
// if (e.name === 'RoomOfflineError') {
// // If room is offline, we want to retry until graphile-worker retries expire.
// // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
// // graphile-worker will retry when we re-throw the error below.
// helpers.logger.info(`Room is offline.`)
// } else if (e.name === 'AbortError') {
// // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
// // We swallow the error and return in order to mark the job as succeeded.
// helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
// return
// } else {
// helpers.logger.error(e.message)
// }
// } else {
// helpers.logger.error(JSON.stringify(e))
// }
// // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
// helpers.logger.error(`we got an error during record Task so we throw and retry`)
// throw e
// }
// helpers.logger.info('record Task has finished')
}

@ -22,6 +22,7 @@
"dependencies": {
"@aws-sdk/client-s3": "^3.637.0",
"@aws-sdk/lib-storage": "^3.637.0",
"@futureporn/fetchers": "workspace:^",
"@futureporn/storage": "workspace:^",
"@futureporn/utils": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2",

@ -14,6 +14,9 @@ importers:
'@aws-sdk/lib-storage':
specifier: ^3.637.0
version: 3.637.0(@aws-sdk/client-s3@3.637.0)
'@futureporn/fetchers':
specifier: workspace:^
version: link:../../packages/fetchers
'@futureporn/storage':
specifier: workspace:^
version: link:../../packages/storage

@ -13,9 +13,10 @@ import { createReadStream, createWriteStream, write } from 'node:fs';
import { writeFile, readFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { promisify } from 'node:util';
import patchVodInDatabase from '../fetchers/patchVodInDatabase'
import patchVodInDatabase from '@futureporn/fetchers/patchVodInDatabase.ts'
import { downloadFile } from '@futureporn/storage/s3.ts';
import { S3FileRecord } from '@futureporn/types';
import { S3FileRecord, VodRecord } from '@futureporn/types';
const pipelinePromise = promisify(pipeline)
interface s3ManifestEntry {
key: string;
@ -108,20 +109,6 @@ const concatVideos = async function (videoFilePaths: string[]): Promise<string>
const setupUploadPipeline = function ({ inputStream, uploadStream }: { inputStream: Readable, uploadStream: PassThrough }) {
pipeline(
inputStream,
uploadStream,
(err: any) => {
if (err) {
console.error(`upload pipeline errored.`)
console.error(err)
} else {
console.log('upload pipeline succeeded.')
}
}
)
}
const getS3ParallelUpload = async function ({
filePath,
@ -138,7 +125,7 @@ const getS3ParallelUpload = async function ({
const uploadStream = new PassThrough()
const target: S3Target = {
Bucket: configs.s3Bucket,
Bucket: configs.s3UscBucket,
Key: s3KeyName,
Body: uploadStream
}
@ -165,7 +152,7 @@ export const combine_video_segments: Task = async function (payload: unknown, he
assertPayload(payload)
const { s3_manifest, vod_id } = payload
if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.');
helpers.logger.info(`combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`)
helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`)
/**
* Here we take a manifest of S3 files and we download each of them.
@ -198,17 +185,13 @@ export const combine_video_segments: Task = async function (payload: unknown, he
const inputStream = createReadStream(concatenatedVideoFile)
const filePath = concatenatedVideoFile
const { uploadStream, upload } = await getS3ParallelUpload({ client, s3KeyName, filePath })
setupUploadPipeline({ inputStream, uploadStream })
pipelinePromise(inputStream, uploadStream)
await upload.done()
if (!vod_id) throw new Error('vod_id was missing from payload');
// update the vod with the s3_file of the combined video
const s3File: S3FileRecord = {
s3_key: s3KeyName,
bucket: configs.s3UscBucket,
}
const payload = {
s3_file: s3File,
s3_file: s3KeyName,
vod_id
}
await patchVodInDatabase(vod_id, payload)

@ -1,8 +1,8 @@
import type { Task, Helpers } from "graphile-worker";
import getVod from "../fetchers/getVod";
import getVod from "@futureporn/fetchers/getVod";
import { getStoryboard } from '@futureporn/utils/image.ts'
import { getCdnUrl, uploadFile, s3CdnMap, type S3FileArgs } from '@futureporn/storage/s3.ts'
import patchVodInDatabase from "../fetchers/patchVodInDatabase";
import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase";
import { configs } from "../config";
import { basename } from "path";
@ -21,14 +21,14 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper
assertPayload(payload)
const { vod_id } = payload
if (!vod_id) throw new Error('generate_thumbnail Task was started without a vod_id. This is unsupported.');
helpers.logger.info(`generate_thumbnail started with vod_id=${vod_id}`);
helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}`);
const vod = await getVod(vod_id, helpers)
const s3_file = vod?.s3_file
if (!s3_file) throw new Error(`vod ${vod_id} was missing a s3_file.`);
// we need to get a CDN url to the vod so we can download chunks of the file in order for Prevvy to create the storyboard image.
const cdnUrl = getCdnUrl(configs.s3Bucket, s3_file.s3_key)
const cdnUrl = getCdnUrl(configs.s3MainBucket, s3_file.s3_key)
const tmpImagePath = await getStoryboard(cdnUrl)
@ -38,7 +38,7 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper
filePath: tmpImagePath,
s3AccessKeyId: configs.s3AccessKeyId,
s3SecretAccessKey: configs.s3SecretAccessKey,
s3BucketName: configs.s3Bucket,
s3BucketName: configs.s3MainBucket,
s3Endpoint: configs.s3Region,
s3Region: configs.s3Region
}
@ -47,7 +47,7 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper
if (!upload.Key) throw new Error(`failed to upload ${tmpImagePath} to S3 (upload.Key was missing)`);
// we need to create a S3 file in the db
const thumbnail = getCdnUrl(configs.s3Bucket, upload.Key)
const thumbnail = getCdnUrl(configs.s3MainBucket, upload.Key)
await patchVodInDatabase(vod_id, { thumbnail })

@ -1,8 +1,8 @@
import type { Helpers, Task } from "graphile-worker"
import { configs } from "../config"
import type { Stream } from '@futureporn/types'
import createVod from "../fetchers/createVod"
import getVod from "../fetchers/getVod"
import createVod from "@futureporn/fetchers/createVod"
import getVod from "@futureporn/fetchers/getVod"
interface Payload {
vod_id: string;
@ -43,23 +43,24 @@ function assertPayload(payload: any): asserts payload is Payload {
const process_video: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { vod_id } = payload
helpers.logger.info(`process_video task has begun for vod_id=${vod_id}`)
helpers.logger.info(`🏗️ process_video task has begun for vod_id=${vod_id}`)
const vod = await getVod(vod_id, helpers)
if (!vod) throw new Error(`failed to get vod from database.`);
if (!vod.segments) throw new Error(`vod ${vod_id} fetched from database lacked any segments.`);
const maxAttempts = 6
const isCombinationNeeded = (vod.segments.length > 1)
if (isCombinationNeeded) {
const s3_manifest = vod.segments.map((segment) => ({ key: segment.s3_key, bytes: segment.bytes }))
helpers.logger.info(`There are ${vod.segments.length} segments; Concatenation is needed.`)
helpers.addJob('combine_video_segments', { s3_manifest, vod_id })
helpers.addJob('combine_video_segments', { s3_manifest, vod_id }, { maxAttempts })
} else {
helpers.addJob('remux_video', { vod_id })
helpers.addJob('remux_video', { vod_id }, { maxAttempts })
}
helpers.addJob('generate_thumbnail', { vod_id })
helpers.addJob('generate_thumbnail', { vod_id }, { maxAttempts })
// helpers.addJob('queue_moderator_review', { })
// helpers.addJob('create_mux_asset', { })
// helpers.addJob('create_torrent', { })

@ -1,11 +1,12 @@
import type { Helpers, Task } from "graphile-worker"
import { configs } from "../config"
import getVod from "../fetchers/getVod"
import getVod from "@futureporn/fetchers/getVod"
import { downloadFile, uploadFile, type S3FileArgs } from "@futureporn/storage/s3.ts"
import { remux } from '@futureporn/utils/video.ts'
import { getTmpFile } from "@futureporn/utils/file.ts"
import { basename } from "node:path"
import patchVodInDatabase from "../fetchers/patchVodInDatabase"
import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase"
import { S3Client, S3ClientConfig } from "@aws-sdk/client-s3"
interface Payload {
vod_id: string;
@ -28,7 +29,7 @@ function assertPayload(payload: any): asserts payload is Payload {
const remux_video: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { vod_id } = payload
helpers.logger.info(`remux_video task has begun for vod_id=${vod_id}`)
helpers.logger.info(`🏗️ remux_video task has begun for vod_id=${vod_id}`)
const vod = await getVod(vod_id, helpers)
@ -46,15 +47,16 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) {
}
const tmpFilePath = getTmpFile(segmentFileName)
const downloadArgs: S3FileArgs = {
filePath: tmpFilePath,
s3AccessKeyId: configs.s3AccessKeyId,
s3SecretAccessKey: configs.s3SecretAccessKey,
s3BucketName: configs.s3Bucket,
s3Endpoint: configs.s3Region,
s3Region: configs.s3Region
const s3ClientConfig: S3ClientConfig = {
credentials: {
accessKeyId: configs.s3AccessKeyId,
secretAccessKey: configs.s3SecretAccessKey,
},
endpoint: configs.s3Endpoint,
region: configs.s3Region,
}
await downloadFile(downloadArgs)
const client = new S3Client(s3ClientConfig)
await downloadFile(client, configs.s3UscBucket, segmentFileName)
helpers.logger.info('Remuxing the video')
const outputVideoPath = getTmpFile(`${basename(segmentFileName, '.ts')}.mp4`)
@ -65,7 +67,7 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) {
filePath: outputVideoPath,
s3AccessKeyId: configs.s3AccessKeyId,
s3SecretAccessKey: configs.s3SecretAccessKey,
s3BucketName: configs.s3Bucket,
s3BucketName: configs.s3MainBucket,
s3Endpoint: configs.s3Region,
s3Region: configs.s3Region
}
@ -76,4 +78,6 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) {
}
export default remux_video;

@ -1,7 +1,7 @@
import type { VtuberDataScrape } from "@futureporn/types"
import { fetchHtml, getBroadcasterDisplayName, getInitialRoomDossier } from './cb.ts'
import { getAccountData, usernameRegex } from "./fansly.ts"
import { fpSlugify } from "@futureporn/utils"
import { fpSlugify } from "@futureporn/utils/name.ts"
/**

@ -1,3 +0,0 @@
export async function greet(name) {
return `Hello, ${name}!`;
}

@ -1,42 +0,0 @@
import { Client, Connection } from '@temporalio/client';
import { example } from './temporal.workflow.js';
import { createId } from '@paralleldrive/cuid2';
async function run() {
// const cert = await fs.readFile('./path-to/your.pem');
// const key = await fs.readFile('./path-to/your.key');
let connectionOptions = {
address: 'temporal-frontend.futureporn.svc.cluster.local',
};
const connection = await Connection.connect(connectionOptions);
const client = new Client({
connection,
namespace: 'futureporn',
});
console.log('>>> WE ARE RUNNING THE WORKFLOW!!!!')
const handle = await client.workflow.start(example, {
taskQueue: 'hello-world',
// type inference works! args: [name: string]
args: ['Temporal'],
// in practice, use a meaningful business ID, like customerId or transactionId
workflowId: 'workflow-' + createId(),
});
console.log(`Started workflow ${handle.workflowId}`);
// optional: wait for client result
console.log(await handle.result()); // Hello, Temporal!
await client.connection.close();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});

@ -1,38 +0,0 @@
import { NativeConnection, Worker } from '@temporalio/worker'
import * as activities from './temporal.activities.js'
import path from 'node:path'
async function run() {
// Step 1: Establish a connection with Temporal server.
//
// Worker code uses `@temporalio/worker.NativeConnection`.
// (But in your application code it's `@temporalio/client.Connection`.)
const connection = await NativeConnection.connect({
address: 'temporal-frontend.futureporn.svc.cluster.local',
// TLS and gRPC metadata configuration goes here.
});
// Step 2: Register Workflows and Activities with the Worker.
const worker = await Worker.create({
connection,
namespace: 'futureporn',
taskQueue: 'hello-world',
// Workflows are registered using a path as they run in a separate JS context.
workflowsPath: path.join(import.meta.dirname, './temporal.workflow.js'),
activities,
});
// Step 3: Start accepting tasks on the `hello-world` queue
//
// The worker runs until it encounters an unexpected error or the process receives a shutdown signal registered on
// the SDK Runtime object.
//
// By default, worker logs are written via the Runtime logger to STDERR at INFO level.
//
// See https://typescript.temporal.io/api/classes/worker.Runtime#install to customize these defaults.
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
})

@ -1,10 +0,0 @@
import { proxyActivities } from '@temporalio/workflow';
const { greet } = proxyActivities({
startToCloseTimeout: '1 minute',
});
/** A workflow that simply calls an activity */
export async function example(name) {
return await greet(name);
}

@ -31,7 +31,7 @@ export class RoomOfflineError extends Error {
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
console.log(`getPlaylistUrl roomUrl=${roomUrl}, proxy=${false}, retries=${retries}`)
console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`)
let args = ['-g', roomUrl]
if (proxy) {
console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`)
@ -49,7 +49,7 @@ export async function getPlaylistUrl (roomUrl: string, proxy = false, retries =
else throw new ExhaustedRetriesError()
} else if (code === 0 && output.match(/https:\/\/.*\.m3u8/)) {
// this must be an OK result with a playlist
return output
return output.trim()
} else if (code === 1 && output.match(/Room is currently offline/)) {
throw new RoomOfflineError()
} else {

@ -46,6 +46,6 @@
"node": "20.x.x",
"npm": ">=6.0.0"
},
"packageManager": "pnpm@9.5.0",
"packageManager": "pnpm@9.6.0",
"license": "MIT"
}