fp/factory can now concatenate
ci / build (push) Failing after 1s Details

This commit is contained in:
CJ_Clippy 2024-08-09 16:28:37 -08:00
parent 54572dbebe
commit 331e27138f
31 changed files with 3528 additions and 136 deletions

View File

@ -12,7 +12,7 @@ Tested on VKE v1.30.0+1 (PVCs on other versions may not be fulfilled)
direnv for loading .envrc direnv for loading .envrc
pg-boss for work queue, cron Graphile Worker for work queue, cron
Postgres for data storage Postgres for data storage

View File

@ -119,7 +119,7 @@ k8s_yaml(helm(
docker_build( docker_build(
'fp/strapi', 'fp/strapi',
'.', '.',
dockerfile='./d.strapi.dockerfile', dockerfile='./dockerfiles/strapi.dockerfile',
target='strapi', target='strapi',
only=[ only=[
'./.npmrc', './.npmrc',
@ -147,7 +147,7 @@ docker_build(
'./services/bot', './services/bot',
'./packages/types', './packages/types',
], ],
dockerfile='./d.bot.dockerfile', dockerfile='./dockerfiles/bot.dockerfile',
target='dev', target='dev',
live_update=[ live_update=[
sync('./services/bot', '/app/services/bot') sync('./services/bot', '/app/services/bot')
@ -199,13 +199,20 @@ cmd_button('pgadmin4:restore',
icon_name='hub', icon_name='hub',
text='import connection', text='import connection',
) )
cmd_button('factory:test',
argv=['./scripts/factory-test.sh'],
resource='factory',
icon_name='factory',
text='test',
)
## Uncomment the following for fp/next in dev mode ## Uncomment the following for fp/next in dev mode
## this is useful for changing the UI and seeing results ## this is useful for changing the UI and seeing results
docker_build( docker_build(
'fp/next', 'fp/next',
'.', '.',
dockerfile='d.next.dockerfile', dockerfile='dockerfiles/next.dockerfile',
target='next', target='next',
build_args={ build_args={
'NEXT_PUBLIC_STRAPI_URL': 'https://strapi.fp.sbtp.xyz' 'NEXT_PUBLIC_STRAPI_URL': 'https://strapi.fp.sbtp.xyz'
@ -216,11 +223,23 @@ docker_build(
pull=False, pull=False,
) )
docker_build(
'fp/factory',
'.',
dockerfile='./dockerfiles/factory.dockerfile',
target='dev',
live_update=[
sync('./services/factory', '/app/services/factory')
],
pull=False,
)
# docker_build( # docker_build(
# 'fp/scout', # 'fp/scout',
# '.', # '.',
# dockerfile='d.scout.dockerfile', # dockerfile='dockerfiles/scout.dockerfile',
# target='scout', # target='scout',
# live_update=[ # live_update=[
# sync('./packages/scout', '/app'), # sync('./packages/scout', '/app'),
@ -233,7 +252,7 @@ docker_build(
docker_build( docker_build(
'fp/mailbox', 'fp/mailbox',
'.', '.',
dockerfile='d.mailbox.dockerfile', dockerfile='dockerfiles/mailbox.dockerfile',
target='mailbox', target='mailbox',
only=[ only=[
'./.npmrc', './.npmrc',
@ -261,7 +280,7 @@ docker_build(
# docker_build( # docker_build(
# 'fp/meal', # 'fp/meal',
# '.', # '.',
# dockerfile='d.meal.dockerfile', # dockerfile='dockerfiles/meal.dockerfile',
# target='meal', # target='meal',
# only=[ # only=[
# './.npmrc', # './.npmrc',
@ -281,7 +300,7 @@ docker_build(
docker_build( docker_build(
'fp/capture', 'fp/capture',
'.', '.',
dockerfile='d.capture.dockerfile', dockerfile='dockerfiles/capture.dockerfile',
target='dev', target='dev',
only=[ only=[
'./.npmrc', './.npmrc',
@ -367,6 +386,11 @@ k8s_resource(
resource_deps=['postgresql-primary', 'strapi'], resource_deps=['postgresql-primary', 'strapi'],
labels=['backend'], labels=['backend'],
) )
k8s_resource(
workload='factory',
resource_deps=['postgrest'],
labels=['backend'],
)
# k8s_resource( # k8s_resource(

View File

@ -0,0 +1,57 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: factory
namespace: futureporn
labels:
app.kubernetes.io/name: factory
spec:
replicas: {{ .Values.factory.replicas }}
selector:
matchLabels:
app: factory
template:
metadata:
labels:
app: factory
spec:
containers:
- name: factory
image: "{{ .Values.factory.imageName }}"
env:
- name: WORKER_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: postgrest
key: dbUri
- name: AUTOMATION_USER_JWT
valueFrom:
secretKeyRef:
name: postgrest
key: automationUserJwt
- name: POSTGREST_URL
value: "{{ .Values.postgrest.url }}"
- name: S3_ENDPOINT
value: "{{ .Values.s3.endpoint }}"
- name: S3_REGION
value: "{{ .Values.s3.region }}"
- name: S3_BUCKET
value: "{{ .Values.s3.buckets.usc }}"
- name: S3_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: capture
key: s3AccessKeyId
- name: S3_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: capture
key: s3SecretAccessKey
resources:
limits:
cpu: 250m
memory: 1Gi
restartPolicy: Always

View File

@ -32,6 +32,9 @@ mailbox:
cdnBucketUrl: https://fp-dev.b-cdn.net cdnBucketUrl: https://fp-dev.b-cdn.net
s3BucketName: fp-dev s3BucketName: fp-dev
port: 5000 port: 5000
factory:
replicas: 1
imageName: fp/factory
strapi: strapi:
replicas: 1 replicas: 1
imageName: fp/strapi imageName: fp/strapi

View File

@ -0,0 +1,67 @@
## d.factory.dockerfile
##
## @futureporn/factory is the system component which processes video segments into a VOD.
## Factory does tasks such as thumbnail generation, video encoding, file transfers, strapi record creation, etc.
FROM node:20 AS base
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app
COPY --from=mwader/static-ffmpeg:7.0.2 /ffmpeg /usr/local/bin/
COPY --from=mwader/static-ffmpeg:7.0.2 /ffprobe /usr/local/bin/
RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENTRYPOINT ["pnpm"]
FROM base AS install
WORKDIR /app
RUN mkdir -p /app/services/factory && mkdir -p /prod/factory
## Copy manfiests, lockfiles, and configs into docker context
COPY package.json pnpm-lock.yaml .npmrc .
# COPY ./packages/image/pnpm-lock.yaml ./packages/image/package.json ./packages/image/
# COPY ./packages/scout/pnpm-lock.yaml ./packages/scout/package.json ./packages/scout/
# COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/
# COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/utils/
COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/
COPY ./services/factory/pnpm-lock.yaml ./services/factory/package.json ./services/factory/
## Install npm packages
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
## we install node-gyp explicitly in order for sharp to install properly
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --prefer-offline
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline
## Copy package code into docker context
# COPY ./packages/image/ ./packages/image/
# COPY ./packages/scout/ ./packages/scout/
# COPY ./packages/storage/ ./packages/storage/
# COPY ./packages/utils/ ./packages/utils/
COPY ./packages/types/ ./packages/types/
COPY ./services/factory/ ./services/factory/
# we are grabbing the mp4 files from capture so we can run tests with them
COPY ./services/capture/src/fixtures ./services/capture/src/fixtures
FROM install AS build
## Transpile TS into JS
## we have to build @futureporn/image first because other packages depend on it's built js files
## next we build everything else
# RUN pnpm --filter=@futureporn/image build
# RUN pnpm --filter=!@futureporn/image -r build
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm -r build
## Copy all production code into one place
## `pnpm deploy` copies all dependencies into an isolated node_modules directory inside the target dir
## @see https://pnpm.io/cli/deploy
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/factory --prod /prod/factory
FROM install AS dev
WORKDIR /app/services/factory
RUN ls -lash
CMD ["run", "dev"]
FROM base AS factory
COPY --from=build /prod/factory .
RUN ls -la .
CMD ["start"]

View File

@ -0,0 +1,58 @@
FROM node:20-alpine3.18 AS base
## Installing libvips-dev for sharp Compatibility
## (only necessary for alpine docker images)
RUN apk update && apk add --no-cache build-base gcc autoconf automake zlib-dev libpng-dev nasm bash vips-dev git
RUN corepack enable && corepack prepare pnpm@9.5.0 --activate
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
ARG NODE_ENV=development
ENV NODE_ENV=${NODE_ENV}
EXPOSE 1339
ENTRYPOINT ["pnpm"]
FROM base AS build
WORKDIR /app
RUN mkdir -p /prod/strapi
COPY pnpm-workspace.yaml pnpm-lock.yaml .npmrc package.json .
COPY ./packages/types ./packages/types
COPY ./packages/strapi ./packages/strapi
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
# Do I need node-gyp?
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --prefer-offline
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --prefer-offline
RUN pnpm -r build
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=strapi /prod/strapi
RUN ls -lah ./
RUN ls -lah ./packages
RUN ls -lah ./packages/strapi
RUN ls -lah /prod/strapi
# FROM base AS build
# RUN mkdir -p /prod/strapi
# WORKDIR /opt/
# COPY ./packages/strapi/package.json ./packages/strapi/pnpm-lock.yaml ./
# RUN pnpm fetch
# RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp
# RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --frozen-lockfile
# ENV PATH /opt/node_modules/.bin:$PATH
# WORKDIR /opt/app
# COPY ./packages/strapi/. .
# RUN pnpm -r build
# RUN pnpm deploy --filter=strapi /prod/strapi
FROM base AS dev
COPY --from=build /prod/strapi .
CMD ["run", "develop"]
FROM base AS strapi
WORKDIR /opt/app
RUN chown -R node:node /opt/app
USER node
COPY --from=build /prod/strapi .
RUN ls -la .
CMD ["start"]

View File

@ -6,13 +6,7 @@
"scripts": { "scripts": {
"test": "echo \"Warn: no test specified\" && exit 0", "test": "echo \"Warn: no test specified\" && exit 0",
"clean": "rm -rf node_modules && rm -rf pnpm-lock.yaml", "clean": "rm -rf node_modules && rm -rf pnpm-lock.yaml",
"dev": "concurrently npm:dev.db npm:dev.bot npm:dev.capture.worker npm:dev.capture.api", "dev": "tilt up"
"dev.db": "docker run -i -e POSTGRES_PASSWORD=mysecretpassword -e POSTGRES_USER=william --rm -p 5435:5432 postgres:16",
"dev.capture.api": "dotenvx run -f .env.capture.api -- pnpm --filter=@futureporn/capture run dev",
"dev.capture.worker": "dotenvx run -f .env.capture.worker -- pnpm --filter=@futureporn/capture run dev",
"dev.bot": "dotenvx run -f .env.bot -- pnpm --filter=@futureporn/bot run dev",
"dev.strapi2": "dotenvx run -f .env.development -- pnpm --filter=@futureporn/strapi run build && pnpm --filter=@futureporn/strapi run dev",
"dev.strapi": ""
}, },
"keywords": [], "keywords": [],
"author": "@CJ_Clippy", "author": "@CJ_Clippy",

View File

@ -4,132 +4,147 @@ export as namespace Futureporn;
declare namespace Futureporn { declare namespace Futureporn {
type RecordingState = 'pending' | 'recording' | 'aborted' | 'ended' interface RecordingRecord {
id: number;
recordingState: RecordingState;
fileSize: number;
discordMessageId: string;
isAborted: boolean;
}
interface RawRecordingRecord {
id: number;
recording_state: RecordingState;
file_size: number;
discord_message_id: string;
is_aborted: boolean;
}
type RecordingState = 'pending' | 'recording' | 'aborted' | 'ended'
interface IMuxAsset { interface IMuxAsset {
id: number; id: number;
attributes: { attributes: {
playbackId: string; playbackId: string;
assetId: string; assetId: string;
}
} }
}
interface IPagination { interface IPagination {
page: number; page: number;
pageSize: number; pageSize: number;
pageCount: number; pageCount: number;
total: number; total: number;
}
interface IMuxAssetResponse {
data: IMuxAsset;
meta: IMeta;
}
interface IMeta {
pagination: IPagination;
}
interface IPlatformNotification {
id: number;
attributes: {
source: string;
platform: string;
date: string;
date2: string;
vtuber: number;
} }
}
interface IMuxAssetResponse { interface IPlatformNotificationResponse {
data: IMuxAsset; data: IPlatformNotification;
meta: IMeta; meta: IMeta;
error?: any;
}
interface IStream {
id: number;
attributes: {
date: string;
date2: string;
archiveStatus: 'good' | 'issue' | 'missing';
vods: IVodsResponse;
cuid: string;
vtuber: IVtuberResponse;
tweet: ITweetResponse;
isChaturbateStream: boolean;
isFanslyStream: boolean;
platformNotifications: IPlatformNotification[];
} }
}
interface IMeta { interface IStreamResponse {
pagination: IPagination; data: IStream;
meta: IMeta;
error?: any;
}
interface IStreamsResponse {
data: IStream[];
meta: IMeta;
}
interface IVtuber {
id: number;
attributes: {
slug: string;
displayName: string;
chaturbate?: string;
twitter?: string;
patreon?: string;
twitch?: string;
tiktok?: string;
onlyfans?: string;
youtube?: string;
linktree?: string;
carrd?: string;
fansly?: string;
pornhub?: string;
discord?: string;
reddit?: string;
throne?: string;
instagram?: string;
facebook?: string;
merch?: string;
vods: IVod[];
description1: string;
description2?: string;
image: string;
imageBlur?: string;
themeColor: string;
fanslyId?: string;
chaturbateId?: string;
twitterId?: string;
} }
}
interface IVtuberResponse {
data: IVtuber;
meta: IMeta;
}
interface IVtubersResponse {
data: IVtuber[];
meta: IMeta;
}
interface IPlatformNotification { type NotificationData = {
id: number; isMatch: boolean;
attributes: { url?: string;
source: string; platform?: string;
platform: string; channel?: string;
date: string; displayName?: string;
date2: string; date?: string;
vtuber: number; userId?: string | null;
} avatar?: string;
} };
interface IPlatformNotificationResponse {
data: IPlatformNotification;
meta: IMeta;
error?: any;
}
interface IStream {
id: number;
attributes: {
date: string;
date2: string;
archiveStatus: 'good' | 'issue' | 'missing';
vods: IVodsResponse;
cuid: string;
vtuber: IVtuberResponse;
tweet: ITweetResponse;
isChaturbateStream: boolean;
isFanslyStream: boolean;
platformNotifications: IPlatformNotification[];
}
}
interface IStreamResponse {
data: IStream;
meta: IMeta;
error?: any;
}
interface IStreamsResponse {
data: IStream[];
meta: IMeta;
}
interface IVtuber {
id: number;
attributes: {
slug: string;
displayName: string;
chaturbate?: string;
twitter?: string;
patreon?: string;
twitch?: string;
tiktok?: string;
onlyfans?: string;
youtube?: string;
linktree?: string;
carrd?: string;
fansly?: string;
pornhub?: string;
discord?: string;
reddit?: string;
throne?: string;
instagram?: string;
facebook?: string;
merch?: string;
vods: IVod[];
description1: string;
description2?: string;
image: string;
imageBlur?: string;
themeColor: string;
fanslyId?: string;
chaturbateId?: string;
twitterId?: string;
}
}
interface IVtuberResponse {
data: IVtuber;
meta: IMeta;
}
interface IVtubersResponse {
data: IVtuber[];
meta: IMeta;
}
type NotificationData = {
isMatch: boolean;
url?: string;
platform?: string;
channel?: string;
displayName?: string;
date?: string;
userId?: string | null;
avatar?: string;
};
} }

21
scripts/factory-test.sh Executable file
View File

@ -0,0 +1,21 @@
#!/bin/bash
postgres_pod_name=postgresql-primary-0
if [ -z $POSTGRES_PASSWORD ]; then
echo "POSTGRES_PASSWORD was missing in env. Are you executing this script via Tilt? (that is the intended method)"
exit 5
fi
kubectl -n futureporn exec "${postgres_pod_name}" -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres -d futureporn --command "
SELECT graphile_worker.add_job(
'combine_video_segments',
payload := json_build_object(
's3_manifest', json_build_array(
json_build_object('id', '4_z7d53875ff1c32a1983d30b18_f118989ca296359da_d20240809_m205858_c000_v0001408_t0033_u01723237138038', 'key', 'mock-stream0.mp4'),
json_build_object('id', '4_z7d53875ff1c32a1983d30b18_f107c0649cef835e4_d20240809_m205859_c000_v0001406_t0016_u01723237139170', 'key', 'mock-stream1.mp4'),
json_build_object('id', '4_z7d53875ff1c32a1983d30b18_f10651c62f4ca1b2f_d20240809_m205900_c000_v0001076_t0022_u01723237140217', 'key', 'mock-stream2.mp4')
)
),
max_attempts := 3
);"

18
services/bot/src/crontab Normal file
View File

@ -0,0 +1,18 @@
# @see https://worker.graphile.org/docs/cron
#
# ┌───────────── UTC minute (0 - 59)
# │ ┌───────────── UTC hour (0 - 23)
# │ │ ┌───────────── UTC day of the month (1 - 31)
# │ │ │ ┌───────────── UTC month (1 - 12)
# │ │ │ │ ┌───────────── UTC day of the week (0 - 6) (Sunday to Saturday)
# │ │ │ │ │ ┌───────────── task (identifier) to schedule
# │ │ │ │ │ │ ┌────────── optional scheduling options
# │ │ │ │ │ │ │ ┌────── optional payload to merge
# │ │ │ │ │ │ │ │
# │ │ │ │ │ │ │ │
# * * * * * task ?opts {payload}
## every 5 minutes, we see which /records are stale and we mark them as such.
## this prevents stalled Record updates by marking stalled recordings as stopped
*/5 * * * * expire_records

View File

@ -132,7 +132,7 @@ export default class Record {
parallelUploads3.on("httpUploadProgress", (progress) => { parallelUploads3.on("httpUploadProgress", (progress) => {
if (progress?.loaded) { if (progress?.loaded) {
if (this.onProgress) this.onProgress(this.counter); if (this.onProgress) this.onProgress(this.counter);
console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`); // console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`);
} else { } else {
console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`) console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`)
} }
@ -203,8 +203,6 @@ export default class Record {
} }
) )
// await this.saveToDisk()
console.log('awaiting uploadToS3()...') console.log('awaiting uploadToS3()...')
await this.uploadToS3() await this.uploadToS3()
console.log('uploadToS3() is complete.') console.log('uploadToS3() is complete.')

View File

@ -58,7 +58,9 @@ async function getRecording(url: string, recordId: number, helpers: Helpers) {
const playlistUrl = await getPlaylistUrl(url) const playlistUrl = await getPlaylistUrl(url)
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl }) const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const onProgress = (fileSize: number) => { helpers.logger.info(`onProgress() has fired~! fileSize=${fileSize}`); updateDatabaseRecord({ recordId, recordingState: 'recording', fileSize }).then(checkIfAborted).then((isAborted) => isAborted ? abortController.abort() : null) } const onProgress = (fileSize: number) => {
updateDatabaseRecord({ recordId, recordingState: 'recording', fileSize }).then(checkIfAborted).then((isAborted) => isAborted ? abortController.abort() : null)
}
const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+recordId, abortSignal }) const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+recordId, abortSignal })
return record return record
} }
@ -76,7 +78,7 @@ async function updateDatabaseRecord({
recordingState: RecordingState, recordingState: RecordingState,
fileSize: number fileSize: number
}): Promise<RawRecordingRecord> { }): Promise<RawRecordingRecord> {
console.log(`updating database record with recordId=${recordId}, recordingState=${recordingState}, fileSize=${fileSize}`) // console.log(`updating database record with recordId=${recordId}, recordingState=${recordingState}, fileSize=${fileSize}`)
const payload: any = { const payload: any = {
file_size: fileSize file_size: fileSize
} }

View File

@ -0,0 +1,5 @@
# @futureporn/factory
Factory takes raw materials (video segments) and produces an end product (encoded video, thumbnail)
factory has a big disk and lots of RAM in order to do transcoding tasks

View File

@ -0,0 +1,39 @@
{
"name": "@futureporn/factory",
"type": "module",
"version": "1.0.0",
"description": "",
"main": "src/index.ts",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "pnpm run dev.nodemon # yes this is crazy to have nodemon execute tsx, but it's the only way I have found to get live reloading in TS/ESM/docker with Graphile Worker's way of loading tasks",
"dev.tsx": "tsx ./src/index.ts",
"dev.nodemon": "nodemon --ext ts --exec \"pnpm run dev.tsx\"",
"dev.node": "node --no-warnings=ExperimentalWarning --loader ts-node/esm src/index.ts"
},
"keywords": [
"transcode",
"transcoding",
"process",
"processing"
],
"author": "@cj_clippy",
"license": "Unlicense",
"dependencies": {
"@aws-sdk/client-s3": "^3.627.0",
"@aws-sdk/lib-storage": "^3.588.0",
"@paralleldrive/cuid2": "^2.2.2",
"@types/node": "^22.1.0",
"dotenv": "^16.4.5",
"fluture": "^14.0.0",
"graphile-worker": "^0.16.6",
"ramda": "^0.30.1"
},
"devDependencies": {
"@futureporn/types": "workspace:^",
"@types/ramda": "^0.30.1",
"nodemon": "^3.1.4",
"ts-node": "^10.9.2",
"tsx": "^4.17.0"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,46 @@
import { config } from 'dotenv'
import { join, dirname } from 'node:path'
import { fileURLToPath } from 'node:url';
const __dirname = dirname(fileURLToPath(import.meta.url));
config({ path: join(__dirname, '../../../.env.development') })
if (!process.env.WORKER_CONNECTION_STRING) throw new Error("WORKER_CONNECTION_STRING was missing from env");
if (!process.env.POSTGREST_URL) throw new Error('Missing POSTGREST_URL env var');
if (!process.env.AUTOMATION_USER_JWT) throw new Error('Missing AUTOMATION_USER_JWT env var');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('Missing S3_ACCESS_KEY_ID env var');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('Missing S3_BUCKET_APPLICATION_KEY env var');
if (!process.env.S3_REGION) throw new Error('Missing S3_REGION env var');
if (!process.env.S3_ENDPOINT) throw new Error('Missing S3_REGION env var');
if (!process.env.S3_BUCKET) throw new Error('Missing S3_BUCKET env var');
const postgrestUrl = process.env.POSTGREST_URL!
const automationUserJwt = process.env.AUTOMATION_USER_JWT!
const connectionString = process.env.WORKER_CONNECTION_STRING!
const s3AccessKeyId = process.env.S3_ACCESS_KEY_ID!
const s3Region = process.env.S3_REGION!
const s3Endpoint = process.env.S3_ENDPOINT!
const s3SecretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
const s3Bucket = process.env.S3_BUCKET!
export interface Config {
postgrestUrl: string;
automationUserJwt: string;
connectionString: string;
s3AccessKeyId: string;
s3SecretAccessKey: string;
s3Region: string;
s3Endpoint: string;
s3Bucket: string;
}
export const configs: Config = {
postgrestUrl,
automationUserJwt,
connectionString,
s3AccessKeyId,
s3SecretAccessKey,
s3Endpoint,
s3Region,
s3Bucket,
}

View File

@ -0,0 +1,47 @@
import type { RunnerOptions, GraphileConfig } from 'graphile-worker'
import { run } from 'graphile-worker'
import { join, dirname } from 'node:path'
import { fileURLToPath } from 'url'
import { configs } from './config.ts'
const __dirname = dirname(fileURLToPath(import.meta.url));
async function setupGraphileWorker() {
try {
const preset: GraphileConfig.Preset = {
worker: {
connectionString: configs.connectionString,
concurrentJobs: 3,
fileExtensions: [".js", ".ts"],
taskDirectory: join(__dirname, 'tasks')
},
};
console.log('worker preset as follows')
console.log(preset)
const runnerOptions: RunnerOptions = {
preset
}
const runner = await run(runnerOptions)
if (!runner) throw new Error('failed to initialize graphile worker');
await runner.promise
} catch (e) {
console.error('error caught during setupGraphileWorker')
console.error(e)
}
}
async function main() {
await setupGraphileWorker()
}
main().catch((e) => {
console.error("error during main() function")
console.error(e)
process.exit(3)
})

View File

@ -0,0 +1,303 @@
import { Helpers, type Task } from 'graphile-worker'
import { basename, join } from 'node:path';
import { S3Client, GetObjectCommand } from "@aws-sdk/client-s3"
import { Upload } from "@aws-sdk/lib-storage"
import { createId } from '@paralleldrive/cuid2';
// import { downloadS3File, uploadToS3, createStrapiB2File, createStrapiVod, createStrapiStream } from '@futureporn/storage'
// import { concatenateVideoSegments } from '@futureporn/video'
import { execFile } from 'node:child_process';
import { configs } from '../config';
import { pipeline, PassThrough, Readable } from 'node:stream';
import { createReadStream, createWriteStream, write } from 'node:fs';
import { writeFile, readFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { promisify } from 'node:util';
interface s3File {
key: string;
id: string;
}
interface Payload {
s3_manifest: s3File[];
}
interface S3Target {
Bucket: string;
Key: string;
Body: Readable;
}
interface S3UploadParameters {
bucket: string;
keyName: string;
uploadStream: PassThrough;
client: S3Client;
onProgress?: Function;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.s3_manifest !== "object") throw new Error("invalid s3_manifest");
}
const downloadS3File = async function (client: S3Client, s3File: s3File): Promise<string> {
const bucket = configs.s3Bucket;
const { key, id } = s3File
console.log(`downloadS3File with s3File key=${key}, bucket=${bucket}`)
const getObjectCommand = new GetObjectCommand({ Bucket: bucket, Key: key })
const response = await client.send(getObjectCommand)
if (!response) throw new Error(`failed to receive a response while calling S3 GetObjectCommand (within downloadS3File)`);
if (!response.Body) throw new Error('S3 GetObjectCommand response did not have a Body (within downloadS3File)');
const readStream = response.Body as Readable
const outputFilePath = join(tmpdir(), key)
const writeStream = createWriteStream(outputFilePath)
const pipelinePromise = promisify(pipeline)
await pipelinePromise(readStream, writeStream)
return outputFilePath
}
/**
*
* generate a txt file on disk which ffmpeg's concat filter will use to concatenate files.
* example: ffmpeg -f concat -safe 0 -i ./files.txt -c copy ./projektmelody-chaturbate-2023-07-18.mp4
* the text file is written to os.tmpdir()
* the text file contents looks like the following.
*
* file './cb-recording-part-1.mp4'
* file './cb-recording-part-2.mp4'
* file './cb-recording-part-3.mp4'
*/
const getFFmpegConcatSpecFile = async function (inputVideoFilePaths: string[]): Promise<string> {
if (!inputVideoFilePaths) throw new Error('getFFmpegConcatSpec file requires an array of filepaths as argument, but it was undefined');
if (inputVideoFilePaths.length < 1) throw new Error('getFFmpegConcatSpec arg0, inputVideoFilePaths was length 0 which is unsupported.');
let lines = []
for (const filePath of inputVideoFilePaths) {
lines.push(`file '${filePath}'`)
}
const specFilePath = join(tmpdir(), `concat-spec-${createId()}`)
await writeFile(specFilePath, lines.join('\n'), { encoding: 'utf-8' })
return specFilePath
}
const getFFmpegConcatenation = async function (specFilePath: string, outputPath: string) {
if (!specFilePath) throw new Error('getFFmpegStream requires specFilePath as arg');
const execFileP = promisify(execFile)
const { stdout, stderr } = await execFileP('ffmpeg', [
'-f', 'concat',
'-safe', '0',
'-i', specFilePath,
'-c', 'copy',
outputPath
])
console.log(stdout)
console.log(stderr)
return outputPath
}
const concatVideos = async function (videoFilePaths: string[]): Promise<string> {
console.log(`concatVideos with ${JSON.stringify(videoFilePaths)}`)
if (!videoFilePaths || videoFilePaths.length < 1 || typeof videoFilePaths[0] !== 'string') throw new Error('concatVideos requires videoFilePaths as arg, but it was undefined or not an array of strings');
if (videoFilePaths.length === 1) {
// if there is only one video, we don't need to do anything.
return videoFilePaths[0]
}
const concatSpec = await getFFmpegConcatSpecFile(videoFilePaths)
const outputVideoPath = join(tmpdir(), `${basename(videoFilePaths[0], '.mp4')}-${createId()}.mp4`)
try {
await getFFmpegConcatenation(concatSpec, outputVideoPath)
} catch (err) {
console.error(`error encountered while concatenating video files together`)
console.error(err)
throw err
}
// const outputStream = createWriteStream(outputVideoPath)
// console.log(`writing concatenated video to ${outputVideoPath}`)
// const pipelinePromise = promisify(pipeline)
// await pipelinePromise(ffmpegStream, outputStream)
return outputVideoPath
}
// const uploadToS3 = async function ({ bucket, keyName, uploadStream, client, onProgress }: S3UploadParameters) {
// let counter = 0;
// const target = {
// Bucket: bucket,
// Key: keyName,
// Body: uploadStream
// }
// // greets https://stackoverflow.com/a/70159394/1004931
// try {
// const parallelUploads3 = new Upload({
// client,
// partSize: 1024 * 1024 * 5,
// queueSize: 1,
// leavePartsOnError: false,
// params: target,
// });
// parallelUploads3.on("httpUploadProgress", (progress) => {
// if (progress?.loaded) {
// if (onProgress) onProgress(counter);
// // console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`);
// } else {
// console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`)
// }
// });
// console.log('Waiting for parallelUploads3 to finish...')
// await parallelUploads3.done();
// console.log('parallelUploads3 is complete.')
// } catch (e) {
// if (e instanceof Error) {
// console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`)
// throw e
// } else {
// throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
// }
// }
// }
const setupUploadPipeline = function ({ inputStream, uploadStream }: { inputStream: Readable, uploadStream: PassThrough }) {
pipeline(
inputStream,
uploadStream,
(err: any) => {
if (err) {
console.error(`upload pipeline errored.`)
console.error(err)
} else {
console.log('upload pipeline succeeded.')
}
}
)
}
const getS3ParallelUpload = async function ({
filePath,
client,
s3KeyName
}: {
s3KeyName: string,
filePath: string,
client: S3Client,
}): Promise<{upload: Upload, uploadStream: PassThrough}> {
if (!filePath) throw new Error("first argument passed to uploadToS3, 'filePath' is undefined");
console.log(`uploading ${s3KeyName} to S3`)
const uploadStream = new PassThrough()
const target: S3Target = {
Bucket: configs.s3Bucket,
Key: s3KeyName,
Body: uploadStream
}
const upload = new Upload({
client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
leavePartsOnError: false,
params: target,
});
return { upload, uploadStream }
}
const createStrapiB2File = async function (): Promise<number> {
}
const createStrapiStream = async function (): Promise<number> {
}
export const combine_video_segments: Task = async function (payload: unknown, helpers: Helpers) {
// helpers.logger.info('the following is the raw Task payload')
// helpers.logger.info(payload)
// helpers.logger.info(JSON.stringify(payload?.s3_manifest))
assertPayload(payload)
const { s3_manifest } = payload
helpers.logger.info(`combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}`)
/**
* Here we take a manifest of S3 files and we download each of them.
* Then we combine them all, preserving original order using `ffmpeg -f concat`
* Then we upload the resulting video to S3
* Then we create records in Strapi
* * B2 file
* * VOD
* * Stream(?)
*/
// const downloadTasks = s3_manifest.map(file => downloadS3File(client, file));
try {
const client = new S3Client({
endpoint: configs.s3Endpoint,
region: configs.s3Region,
credentials: {
accessKeyId: configs.s3AccessKeyId,
secretAccessKey: configs.s3SecretAccessKey
}
});
const s3Manifest = s3_manifest
const inputVideoFilePaths = await Promise.all(s3Manifest.map((m) => downloadS3File(client, m)))
const concatenatedVideoFile = await concatVideos(inputVideoFilePaths)
const s3KeyName = basename(concatenatedVideoFile)
const inputStream = createReadStream(concatenatedVideoFile)
const filePath = concatenatedVideoFile
const { uploadStream, upload } = await getS3ParallelUpload({ client, s3KeyName, filePath })
setupUploadPipeline({ inputStream, uploadStream })
await upload.done()
} catch (e: any) {
helpers.logger.error('combined_video_segments failed')
if (e instanceof Error) {
helpers.logger.error(e.message)
} else {
helpers.logger.error(e)
}
}
// const program = pipe(
// parallel(3)(downloadTasks), // Download files in parallel, with a concurrency limit of 3
// chain(concatVideos), // Concatenate videos
// chain((outputPath) => uploadToS3(client, outputPath)), // Upload to S3
// chain((s3Url) => createStrapiRecord({ videoUrl: s3Url })) // Create Strapi record
// );
// program.fork(
// (error) => {
// helpers.logger.error(`Failed to process video segments: ${error.message}`);
// throw error;
// },
// (result) => {
// helpers.logger.info('Successfully processed video segments');
// return result;
// }
// );
// fork (log ('rejection'))
// (log('resolution'))
// (both (after (200) ('left')) (after (300) ('right')))
}
export default combine_video_segments

View File

@ -0,0 +1,22 @@
// greetz https://github.com/discordeno/discordeno/blob/main/examples/advanced/src/utils/loader.ts
import { readdir } from 'node:fs/promises'
import { join } from 'node:path'
export async function importDirectory(folder: string): Promise<void> {
const files = await readdir(folder, { recursive: true })
// bot.logger.info(files)
for (const filename of files) {
if (!filename.endsWith('.js') && !filename.endsWith('.ts')) continue
console.log(`loading ${filename}`)
// Using `file://` and `process.cwd()` to avoid weird issues with relative paths and/or Windows
// await import(`file://${process.cwd()}/${folder}/${filename}`).catch((x) =>
await import(join(folder, filename)).catch((x) =>
// console.error(x)
console.error(`cannot import ${filename} for reason: ${x}`)
// logger.fatal(`Cannot import file (${folder}/${filename}) for reason: ${x}`),
)
}
}

View File

@ -0,0 +1,33 @@
{
"compilerOptions": {
// Base Options recommended for all projects
"allowImportingTsExtensions": true,
"noEmit": true, // tsup does the emissions
"esModuleInterop": true,
"skipLibCheck": true,
"target": "ESNext",
"allowJs": true,
"moduleResolution": "Bundler",
"resolveJsonModule": true,
"moduleDetection": "force",
"isolatedModules": true,
// Enable strict type checking so you can catch bugs early
"strict": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
// Transpile our TypeScript code to JavaScript
"module": "ESNext",
"outDir": "dist",
"lib": [
"ESNext",
"dom"
]
},
// Include the necessary files for your project
"include": [
"src/**/*.ts"
],
"exclude": [
"node_modules"
]
}