From 4d65294f7d90f9a006bea068cc9df9637981c1e4 Mon Sep 17 00:00:00 2001 From: CJ_Clippy Date: Thu, 5 Sep 2024 21:39:08 -0800 Subject: [PATCH] move fetchers to their own package --- ARCHITECHTURE.md | 4 + Tiltfile | 6 +- charts/fp/templates/capture.yaml | 4 +- charts/fp/values.yaml | 4 +- devbox.json | 3 +- devbox.lock | 108 +++++++++ dockerfiles/bot.dockerfile | 1 + dockerfiles/capture.dockerfile | 2 + dockerfiles/factory.dockerfile | 2 + dockerfiles/next.dockerfile | 1 + dockerfiles/scout.dockerfile | 1 + dockerfiles/worker.dockerfile | 62 ----- packages/fetchers/README.md | 5 + packages/fetchers/package.json | 23 ++ packages/fetchers/pnpm-lock.yaml | 77 +++++++ packages/fetchers/src/config.ts | 27 +++ .../fetchers/src}/createS3File.ts | 2 +- .../fetchers/src}/createSegmentInDatabase.ts | 11 +- .../fetchers/src}/createSegmentsVodLink.ts | 10 +- .../fetchers/src}/createStreamInDatabase.ts | 2 +- .../fetchers/src}/createVod.ts | 8 +- .../fetchers/src}/findOrCreateStream.ts | 4 +- .../fetchers/src}/findOrCreateVtuber.spec.ts | 0 .../fetchers/src}/findOrCreateVtuber.ts | 4 +- .../fetchers/src}/findVod.ts | 13 +- .../fetchers/src}/getPlaylistUrl.ts | 2 +- .../fetchers/src}/getSegmentsFromDatabase.ts | 12 +- .../fetchers/src}/getStreamFromDatabase.ts | 4 +- .../fetchers/src}/getStreamIdFromMessage.ts | 0 .../fetchers/src}/getUrlFromMessage.ts | 2 +- .../fetchers/src}/getVod.ts | 12 +- .../fetchers/src}/patchVod.ts | 2 +- .../fetchers/src}/patchVodInDatabase.ts | 2 +- .../fetchers/src}/updateSegmentInDatabase.ts | 15 +- packages/storage/package.json | 2 +- packages/storage/pnpm-lock.yaml | 2 +- packages/types/src/index.ts | 6 +- packages/utils/src/name.ts | 4 +- services/bot/crontab | 2 +- services/bot/package.json | 1 + services/bot/pnpm-lock.yaml | 3 + services/bot/src/commands/process.ts | 6 +- services/bot/src/commands/record.ts | 8 +- services/bot/src/fetchers/createVod.ts | 28 --- .../bot/src/tasks/update_discord_message.ts | 11 +- .../bot/src/tasks/update_vod_statuses.test.ts | 11 + services/bot/src/tasks/update_vod_statuses.ts | 16 +- services/bot/tsconfig.json | 2 +- services/capture/package.json | 4 +- services/capture/pnpm-lock.yaml | 13 ++ services/capture/src/Record.ts | 80 ++++--- .../capture/src/RecordNextGeneration.spec.ts | 15 ++ services/capture/src/RecordNextGeneration.ts | 213 ++++++++++++++++++ services/capture/src/app.spec.ts | 6 +- services/capture/src/config.ts | 6 +- services/capture/src/poc.ts | 1 - services/capture/src/tasks/record.ts | 191 +++++++++------- services/factory/package.json | 1 + services/factory/pnpm-lock.yaml | 3 + .../src/tasks/combine_video_segments.ts | 31 +-- .../factory/src/tasks/generate_thumbnail.ts | 12 +- services/factory/src/tasks/process_video.ts | 13 +- services/factory/src/tasks/remux_video.ts | 28 ++- services/scout/src/scrapeVtuberData.ts | 2 +- services/scout/src/temporal.activities.js | 3 - services/scout/src/temporal.client.js | 42 ---- services/scout/src/temporal.worker.js | 38 ---- services/scout/src/temporal.workflow.js | 10 - services/scout/src/ytdlp.ts | 4 +- services/strapi/package.json | 2 +- 70 files changed, 812 insertions(+), 443 deletions(-) delete mode 100644 dockerfiles/worker.dockerfile create mode 100644 packages/fetchers/README.md create mode 100644 packages/fetchers/package.json create mode 100644 packages/fetchers/pnpm-lock.yaml create mode 100644 packages/fetchers/src/config.ts rename {services/factory/src/fetchers => packages/fetchers/src}/createS3File.ts (94%) rename {services/capture/src/fetchers => packages/fetchers/src}/createSegmentInDatabase.ts (80%) rename {services/capture/src/fetchers => packages/fetchers/src}/createSegmentsVodLink.ts (84%) rename {services/bot/src/fetchers => packages/fetchers/src}/createStreamInDatabase.ts (93%) rename {services/factory/src/fetchers => packages/fetchers/src}/createVod.ts (82%) rename {services/bot/src/fetchers => packages/fetchers/src}/findOrCreateStream.ts (95%) rename {services/bot/src/fetchers => packages/fetchers/src}/findOrCreateVtuber.spec.ts (100%) rename {services/bot/src/fetchers => packages/fetchers/src}/findOrCreateVtuber.ts (96%) rename {services/bot/src/fetchers => packages/fetchers/src}/findVod.ts (78%) rename {services/capture/src/fetchers => packages/fetchers/src}/getPlaylistUrl.ts (90%) rename {services/capture/src/fetchers => packages/fetchers/src}/getSegmentsFromDatabase.ts (78%) rename {services/bot/src/fetchers => packages/fetchers/src}/getStreamFromDatabase.ts (90%) rename {services/bot/src/fetchers => packages/fetchers/src}/getStreamIdFromMessage.ts (100%) rename {services/bot/src/fetchers => packages/fetchers/src}/getUrlFromMessage.ts (94%) rename {services/factory/src/fetchers => packages/fetchers/src}/getVod.ts (62%) rename {services/bot/src/fetchers => packages/fetchers/src}/patchVod.ts (92%) rename {services/factory/src/fetchers => packages/fetchers/src}/patchVodInDatabase.ts (91%) rename {services/capture/src/fetchers => packages/fetchers/src}/updateSegmentInDatabase.ts (76%) delete mode 100644 services/bot/src/fetchers/createVod.ts create mode 100644 services/bot/src/tasks/update_vod_statuses.test.ts create mode 100644 services/capture/src/RecordNextGeneration.spec.ts create mode 100644 services/capture/src/RecordNextGeneration.ts delete mode 100644 services/scout/src/temporal.activities.js delete mode 100644 services/scout/src/temporal.client.js delete mode 100644 services/scout/src/temporal.worker.js delete mode 100644 services/scout/src/temporal.workflow.js diff --git a/ARCHITECHTURE.md b/ARCHITECHTURE.md index 3603e01..18ef3b1 100644 --- a/ARCHITECHTURE.md +++ b/ARCHITECHTURE.md @@ -10,6 +10,10 @@ Kubernetes for Production, deployed using FluxCD Tested on VKE v1.30.0+1 (PVCs on other versions may not be fulfilled) +devbox for shareable development environment + +ggshield for preventing git commits containing secrets + direnv for loading .envrc Graphile Worker for work queue, cron diff --git a/Tiltfile b/Tiltfile index 687dc3c..88b2483 100644 --- a/Tiltfile +++ b/Tiltfile @@ -184,6 +184,7 @@ docker_build( './services/bot', './packages/types', './packages/utils', + './packages/fetchers', ], dockerfile='./dockerfiles/bot.dockerfile', target='dev', @@ -202,6 +203,7 @@ docker_build( './pnpm-workspace.yaml', './packages/types', './packages/utils', + './packages/fetchers', './services/scout', ], dockerfile='./dockerfiles/scout.dockerfile', @@ -324,6 +326,7 @@ docker_build( './services/mailbox', './packages/types', './packages/utils', + './packages/fetchers', './packages/video', './packages/storage', ], @@ -350,6 +353,7 @@ docker_build( './pnpm-workspace.yaml', './packages/types', './packages/utils', + './packages/fetchers', './services/capture', ], live_update=[ @@ -385,7 +389,7 @@ docker_build( k8s_resource( workload='scout', resource_deps=['postgresql-primary'], - port_forwards=['5134'], + port_forwards=['8134'], labels=['backend'], ) k8s_resource( diff --git a/charts/fp/templates/capture.yaml b/charts/fp/templates/capture.yaml index 81e0804..43cce9b 100644 --- a/charts/fp/templates/capture.yaml +++ b/charts/fp/templates/capture.yaml @@ -40,6 +40,8 @@ spec: - name: capture-worker image: "{{ .Values.capture.imageName }}" env: + # - name: NODE_DEBUG + # value: "stream.onWriteComplete" - name: SCOUT_URL value: "{{ .Values.scout.url }}" - name: FUNCTION @@ -67,7 +69,7 @@ spec: value: "{{ .Values.s3.endpoint }}" - name: S3_REGION value: "{{ .Values.s3.region }}" - - name: S3_BUCKET + - name: S3_USC_BUCKET value: "{{ .Values.s3.buckets.usc }}" - name: S3_ACCESS_KEY_ID valueFrom: diff --git a/charts/fp/values.yaml b/charts/fp/values.yaml index 7a13350..89b0092 100644 --- a/charts/fp/values.yaml +++ b/charts/fp/values.yaml @@ -77,8 +77,8 @@ chihaya: scout: imageName: fp/scout replicas: 1 - port: 5134 - url: http://scout.futureporn.svc.cluster.local:5134 + port: 8134 + url: http://scout.futureporn.svc.cluster.local:8134 postgrest: url: http://postgrest.futureporn.svc.cluster.local:9000 image: postgrest/postgrest diff --git a/devbox.json b/devbox.json index 21504db..8e9c5f6 100644 --- a/devbox.json +++ b/devbox.json @@ -11,7 +11,8 @@ "ffmpeg@latest", "yt-dlp@latest", "python310@latest", - "python310Packages.pip@latest" + "python310Packages.pip@latest", + "vips@latest" ], "env": { "DEVBOX_COREPACK_ENABLED": "true", diff --git a/devbox.lock b/devbox.lock index 3005b35..72cd4cc 100644 --- a/devbox.lock +++ b/devbox.lock @@ -680,6 +680,114 @@ } } }, + "vips@latest": { + "last_modified": "2024-09-01T03:39:50Z", + "resolved": "github:NixOS/nixpkgs/4a9443e2a4e06cbaff89056b5cdf6777c1fe5755#vips", + "source": "devbox-search", + "version": "8.15.2", + "systems": { + "aarch64-darwin": { + "outputs": [ + { + "name": "bin", + "path": "/nix/store/06dc4n06zs4jygbqa8c7fg9dcg39q9j9-vips-8.15.2-bin", + "default": true + }, + { + "name": "man", + "path": "/nix/store/442bpcz5cx0x2ddc2cp4041arqf87ph0-vips-8.15.2-man", + "default": true + }, + { + "name": "dev", + "path": "/nix/store/1l68927a29x2g1kyxcf7kxsh0lpnshvn-vips-8.15.2-dev" + }, + { + "name": "out", + "path": "/nix/store/6jnr4f8i012ki83bp0aml84acfigsdqr-vips-8.15.2" + } + ], + "store_path": "/nix/store/06dc4n06zs4jygbqa8c7fg9dcg39q9j9-vips-8.15.2-bin" + }, + "aarch64-linux": { + "outputs": [ + { + "name": "bin", + "path": "/nix/store/17asmchn0w4s7gic59r75n3drqdv12cn-vips-8.15.2-bin", + "default": true + }, + { + "name": "man", + "path": "/nix/store/1syxsf45h4ppyx1q0nynx1wnl4dxrnnv-vips-8.15.2-man", + "default": true + }, + { + "name": "dev", + "path": "/nix/store/afy2j1kdch6sb3ryqljlf163khs7hwkl-vips-8.15.2-dev" + }, + { + "name": "devdoc", + "path": "/nix/store/sifwrcrirz82gv0d9n0a683qz88msfzp-vips-8.15.2-devdoc" + }, + { + "name": "out", + "path": "/nix/store/rhlb3vk0sqy6cn521806qpw1rqzji0iv-vips-8.15.2" + } + ], + "store_path": "/nix/store/17asmchn0w4s7gic59r75n3drqdv12cn-vips-8.15.2-bin" + }, + "x86_64-darwin": { + "outputs": [ + { + "name": "bin", + "path": "/nix/store/gzqa0mv4czbw0c5243r97bhw8kch8qmi-vips-8.15.2-bin", + "default": true + }, + { + "name": "man", + "path": "/nix/store/38knm1dzqngi6w517g147gnpd2q720b1-vips-8.15.2-man", + "default": true + }, + { + "name": "dev", + "path": "/nix/store/f9vf62bnkrn1azgyvcjnmgsh1jknc33b-vips-8.15.2-dev" + }, + { + "name": "out", + "path": "/nix/store/487rhwp65bxzhanzf7brma1lcbis7w6c-vips-8.15.2" + } + ], + "store_path": "/nix/store/gzqa0mv4czbw0c5243r97bhw8kch8qmi-vips-8.15.2-bin" + }, + "x86_64-linux": { + "outputs": [ + { + "name": "bin", + "path": "/nix/store/bfy830xkb34pn4dym88rhzsqvlav25bq-vips-8.15.2-bin", + "default": true + }, + { + "name": "man", + "path": "/nix/store/ppi3phzcnr8bh4q4jnz5pp2grvh1dki9-vips-8.15.2-man", + "default": true + }, + { + "name": "dev", + "path": "/nix/store/6v5dd9lz15dls9k62dqnmdpi2af1y59n-vips-8.15.2-dev" + }, + { + "name": "devdoc", + "path": "/nix/store/9kf40jgp84bkxc6202x8z4nqwzn8627v-vips-8.15.2-devdoc" + }, + { + "name": "out", + "path": "/nix/store/k6aq70ncjh3d0vjcw6r8grw764hlagn5-vips-8.15.2" + } + ], + "store_path": "/nix/store/bfy830xkb34pn4dym88rhzsqvlav25bq-vips-8.15.2-bin" + } + } + }, "yt-dlp@latest": { "last_modified": "2024-07-18T22:08:26Z", "resolved": "github:NixOS/nixpkgs/cfa5366588c940ab6ee3bee399b337175545c664#yt-dlp", diff --git a/dockerfiles/bot.dockerfile b/dockerfiles/bot.dockerfile index e87cfde..2779490 100644 --- a/dockerfiles/bot.dockerfile +++ b/dockerfiles/bot.dockerfile @@ -10,6 +10,7 @@ COPY pnpm-lock.yaml .npmrc package.json . COPY ./services/bot/ ./services/bot/ COPY ./packages/types/ ./packages/types/ COPY ./packages/utils/ ./packages/utils/ +COPY ./packages/fetchers/ ./packages/fetchers/ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline diff --git a/dockerfiles/capture.dockerfile b/dockerfiles/capture.dockerfile index 20a2b0d..e665e4d 100644 --- a/dockerfiles/capture.dockerfile +++ b/dockerfiles/capture.dockerfile @@ -24,6 +24,7 @@ COPY package.json pnpm-lock.yaml pnpm-workspace.yaml .npmrc . COPY ./services/capture/package.json ./services/capture/pnpm-lock.yaml ./services/capture/ COPY ./packages/types/package.json ./packages/types/pnpm-lock.yaml ./packages/types/ COPY ./packages/utils/package.json ./packages/utils/pnpm-lock.yaml ./packages/utils/ +COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/ ## install npm packages RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch @@ -33,6 +34,7 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --pre COPY ./services/capture/ ./services/capture/ COPY ./packages/types/ ./packages/types/ COPY ./packages/utils/ ./packages/utils/ +COPY ./packages/fetchers/ ./packages/fetchers/ ## Run the build process and generate the artifacts RUN pnpm run -r build diff --git a/dockerfiles/factory.dockerfile b/dockerfiles/factory.dockerfile index 172f3ca..1596c81 100644 --- a/dockerfiles/factory.dockerfile +++ b/dockerfiles/factory.dockerfile @@ -20,6 +20,7 @@ RUN mkdir -p /app/services/factory && mkdir -p /prod/factory ## Copy manfiests, lockfiles, and configs into docker context COPY package.json pnpm-lock.yaml .npmrc . COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/utils/ +COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/ COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/ COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/ COPY ./services/factory/pnpm-lock.yaml ./services/factory/package.json ./services/factory/ @@ -31,6 +32,7 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --pre RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline ## Copy package code into docker context COPY ./packages/utils/ ./packages/utils/ +COPY ./packages/fetchers/ ./packages/fetchers/ RUN ls -la /app/packages/utils/node_modules/prevvy/ RUn cat ./packages/utils/package.json COPY ./packages/storage/ ./packages/storage/ diff --git a/dockerfiles/next.dockerfile b/dockerfiles/next.dockerfile index b3c4766..dcacc07 100644 --- a/dockerfiles/next.dockerfile +++ b/dockerfiles/next.dockerfile @@ -24,6 +24,7 @@ RUN pnpm fetch # COPY pnpm-lock.yaml .npmrc package.json . COPY ./services/next ./services/next COPY ./packages/types ./packages/types +COPY ./packages/fetchers ./packages/fetchers # COPY ./packages/strapi ./packages/strapi # COPY ./packages/utils ./packages/utils diff --git a/dockerfiles/scout.dockerfile b/dockerfiles/scout.dockerfile index 709767b..18f6cd3 100644 --- a/dockerfiles/scout.dockerfile +++ b/dockerfiles/scout.dockerfile @@ -22,6 +22,7 @@ COPY pnpm-lock.yaml .npmrc package.json . COPY ./services/scout/ ./services/scout/ COPY ./packages/types/ ./packages/types/ COPY ./packages/utils/ ./packages/utils/ +COPY ./packages/fetchers/ ./packages/fetchers/ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline diff --git a/dockerfiles/worker.dockerfile b/dockerfiles/worker.dockerfile deleted file mode 100644 index a08550e..0000000 --- a/dockerfiles/worker.dockerfile +++ /dev/null @@ -1,62 +0,0 @@ -## d.worker.dockerfile -## -## @futureporn/worker is the system component which runs background tasks. -## Tasks such as thumbnail generation, video encoding, file transfers, etc. -## -## @todo future improvement might be merging the dockerfiles for the various monorepo packages. -## this is not an easy task, so I'm not doing it right now. -## "make it work, make it right, make it fast" (in that order) -## Right now we are making things work with separate dockerfiles for each package. -## One thing to determine is build speed. If we're developing in Tilt and have to wait 20 minutes for the build to complete -## every time we change a file in any dependent package, then merging dockerfiles is not desirable. -## One of the slow parts of the docker build is copying all package directories into the build context. -## If we have a lot of packages, it takes a long time. -## I have yet to determine performance benchmarks, so it's unclear if merging dockerfiles is desirable. -## -## @todo another performance improvement would almost certainly be to move strapi, next, and similar packages from `packages/*` into `services/*` -## this way, when we're building the various @futureporn library-type packages, we don't have to filter and COPY the dependency packages one-by-one. -## instead, we add the entire `packages/*` directory and then move on to the next step. - -FROM node:20 AS base -ENV PNPM_HOME="/pnpm" -ENV PATH="$PNPM_HOME:$PATH" -WORKDIR /app - -FROM base AS build -WORKDIR /app -RUN mkdir -p /app/packages/worker && mkdir -p /prod/worker - -## Copy manfiests, lockfiles, and configs into docker context -COPY package.json pnpm-lock.yaml .npmrc . -RUN corepack enable && corepack prepare pnpm@9.6.0 --activate -# COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/ -# COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/ -# COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/utils/ -COPY ./packages/worker/pnpm-lock.yaml ./packages/worker/package.json ./packages/worker/ - -## Install npm packages -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch -## we install node-gyp explicitly in order for sharp to install properly -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install -g node-gyp --prefer-offline -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline - -## Copy package code into docker context -# COPY ./packages/storage/ ./packages/storage/ -# COPY ./packages/types/ ./packages/types/ -# COPY ./packages/utils/ ./packages/utils/ -COPY ./packages/worker/ ./packages/worker/ - -## Transpile TS into JS -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm -r build - -## Copy all production code into one place -## `pnpm deploy` copies all dependencies into an isolated node_modules directory inside the target dir -## @see https://pnpm.io/cli/deploy -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/worker --prod /prod/worker - - -FROM base AS worker -COPY --from=build /prod/worker . -RUN ls -la . -ENTRYPOINT ["pnpm", "start"] - diff --git a/packages/fetchers/README.md b/packages/fetchers/README.md new file mode 100644 index 0000000..85cc7df --- /dev/null +++ b/packages/fetchers/README.md @@ -0,0 +1,5 @@ +# @futureporn/fetchers + +`fetch()` wrappers for getting/setting values in our database. + +distinct from @futureporn/scout which is fetching data from external sources. \ No newline at end of file diff --git a/packages/fetchers/package.json b/packages/fetchers/package.json new file mode 100644 index 0000000..7ba4394 --- /dev/null +++ b/packages/fetchers/package.json @@ -0,0 +1,23 @@ +{ + "name": "@futureporn/fetchers", + "type": "module", + "version": "1.0.0", + "description": "", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 0" + }, + "exports": { + "./*.ts": "./src/*.ts" + }, + "keywords": [], + "author": "", + "license": "Unlicense", + "dependencies": { + "@futureporn/types": "workspace:^" + }, + "devDependencies": { + "@types/chai": "^4.3.19", + "chai": "^5.1.1" + } +} diff --git a/packages/fetchers/pnpm-lock.yaml b/packages/fetchers/pnpm-lock.yaml new file mode 100644 index 0000000..21e05c5 --- /dev/null +++ b/packages/fetchers/pnpm-lock.yaml @@ -0,0 +1,77 @@ +lockfileVersion: '9.0' + +settings: + autoInstallPeers: true + excludeLinksFromLockfile: false + +importers: + + .: + dependencies: + '@futureporn/types': + specifier: workspace:^ + version: link:../types + devDependencies: + '@types/chai': + specifier: ^4.3.19 + version: 4.3.19 + chai: + specifier: ^5.1.1 + version: 5.1.1 + +packages: + + '@types/chai@4.3.19': + resolution: {integrity: sha512-2hHHvQBVE2FiSK4eN0Br6snX9MtolHaTo/batnLjlGRhoQzlCL61iVpxoqO7SfFyOw+P/pwv+0zNHzKoGWz9Cw==} + + assertion-error@2.0.1: + resolution: {integrity: sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==} + engines: {node: '>=12'} + + chai@5.1.1: + resolution: {integrity: sha512-pT1ZgP8rPNqUgieVaEY+ryQr6Q4HXNg8Ei9UnLUrjN4IA7dvQC5JB+/kxVcPNDHyBcc/26CXPkbNzq3qwrOEKA==} + engines: {node: '>=12'} + + check-error@2.1.1: + resolution: {integrity: sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==} + engines: {node: '>= 16'} + + deep-eql@5.0.2: + resolution: {integrity: sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==} + engines: {node: '>=6'} + + get-func-name@2.0.2: + resolution: {integrity: sha512-8vXOvuE167CtIc3OyItco7N/dpRtBbYOsPsXCz7X/PMnlGjYjSGuZJgM1Y7mmew7BKf9BqvLX2tnOVy1BBUsxQ==} + + loupe@3.1.1: + resolution: {integrity: sha512-edNu/8D5MKVfGVFRhFf8aAxiTM6Wumfz5XsaatSxlD3w4R1d/WEKUTydCdPGbl9K7QG/Ca3GnDV2sIKIpXRQcw==} + + pathval@2.0.0: + resolution: {integrity: sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==} + engines: {node: '>= 14.16'} + +snapshots: + + '@types/chai@4.3.19': {} + + assertion-error@2.0.1: {} + + chai@5.1.1: + dependencies: + assertion-error: 2.0.1 + check-error: 2.1.1 + deep-eql: 5.0.2 + loupe: 3.1.1 + pathval: 2.0.0 + + check-error@2.1.1: {} + + deep-eql@5.0.2: {} + + get-func-name@2.0.2: {} + + loupe@3.1.1: + dependencies: + get-func-name: 2.0.2 + + pathval@2.0.0: {} diff --git a/packages/fetchers/src/config.ts b/packages/fetchers/src/config.ts new file mode 100644 index 0000000..3052853 --- /dev/null +++ b/packages/fetchers/src/config.ts @@ -0,0 +1,27 @@ +const requiredEnvVars = [ + 'POSTGREST_URL', + 'AUTOMATION_USER_JWT', + 'SCOUT_URL', +] as const; + +const getEnvVar = (key: typeof requiredEnvVars[number]): string => { + const value = process.env[key]; + if (!value) { + throw new Error(`Missing ${key} env var`); + } + return value; +}; + +export interface Config { + postgrestUrl: string; + automationUserJwt: string; + scoutUrl: string; +} + + +export const configs: Config = { + scoutUrl: getEnvVar('SCOUT_URL'), + postgrestUrl: getEnvVar('POSTGREST_URL'), + automationUserJwt: getEnvVar('AUTOMATION_USER_JWT') +} + diff --git a/services/factory/src/fetchers/createS3File.ts b/packages/fetchers/src/createS3File.ts similarity index 94% rename from services/factory/src/fetchers/createS3File.ts rename to packages/fetchers/src/createS3File.ts index dcdfa88..03698e0 100644 --- a/services/factory/src/fetchers/createS3File.ts +++ b/packages/fetchers/src/createS3File.ts @@ -1,4 +1,4 @@ -import { configs } from "../config" +import { configs } from "../../../services/factory/src/config" import type { S3FileRecord, S3FileResponse } from '@futureporn/types' export default async function createS3File(s3File?: S3FileRecord): Promise { diff --git a/services/capture/src/fetchers/createSegmentInDatabase.ts b/packages/fetchers/src/createSegmentInDatabase.ts similarity index 80% rename from services/capture/src/fetchers/createSegmentInDatabase.ts rename to packages/fetchers/src/createSegmentInDatabase.ts index c7ffd50..c0dcdd0 100644 --- a/services/capture/src/fetchers/createSegmentInDatabase.ts +++ b/packages/fetchers/src/createSegmentInDatabase.ts @@ -1,15 +1,14 @@ -import type { Helpers } from 'graphile-worker' -import { configs } from '../config.ts' +import { configs } from './config.ts' import querystring from 'node:querystring' -export default async function createSegmentInDatabase(s3_key: string, vod_id: string, helpers: Helpers): Promise { +export default async function createSegmentInDatabase(s3_key: string, vod_id: string): Promise { if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg'); const segmentPayload = { s3_key, vod_id } - helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`) - helpers.logger.info(JSON.stringify(segmentPayload)) + // helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`) + // helpers.logger.info(JSON.stringify(segmentPayload)) const res = await fetch(`${configs.postgrestUrl}/segments`, { method: 'POST', headers: { @@ -23,7 +22,7 @@ export default async function createSegmentInDatabase(s3_key: string, vod_id: st if (!res.ok) { const body = await res.text() const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}` - helpers.logger.error(msg) + console.error(msg) throw new Error(msg); } const location = res.headers.get('location') diff --git a/services/capture/src/fetchers/createSegmentsVodLink.ts b/packages/fetchers/src/createSegmentsVodLink.ts similarity index 84% rename from services/capture/src/fetchers/createSegmentsVodLink.ts rename to packages/fetchers/src/createSegmentsVodLink.ts index b030bb1..6098d93 100644 --- a/services/capture/src/fetchers/createSegmentsVodLink.ts +++ b/packages/fetchers/src/createSegmentsVodLink.ts @@ -1,9 +1,9 @@ -import type { Helpers } from 'graphile-worker' -import { configs } from '../config.ts' +// import type { Helpers } from 'graphile-worker' +import { configs } from '../../../services/capture/src/config.ts' import querystring from 'node:querystring' -export default async function createSegmentsVodLink(vod_id: string, segment_id: string, helpers: Helpers): Promise { - helpers.logger.info(`createSegmentsVodLink with vod_id=${vod_id}, segment_id=${segment_id}`) +export default async function createSegmentsVodLink(vod_id: string, segment_id: string): Promise { + console.info(`createSegmentsVodLink with vod_id=${vod_id}, segment_id=${segment_id}`) if (!vod_id) throw new Error('createSegmentsVodLink requires {string} vod_id as first arg'); if (!segment_id) throw new Error('createSegmentsVodLink requires {string} segment_id as second arg'); const segmentVodLinkPayload = { @@ -32,5 +32,5 @@ export default async function createSegmentsVodLink(vod_id: string, segment_id: if (Array.isArray(segmentsId)) throw new Error('segments_vod_links was an array which is unexpected'); const id = segmentsId.split('.').at(-1) if (!id) throw new Error('failed to get id '); - return parseInt(id) + return id } diff --git a/services/bot/src/fetchers/createStreamInDatabase.ts b/packages/fetchers/src/createStreamInDatabase.ts similarity index 93% rename from services/bot/src/fetchers/createStreamInDatabase.ts rename to packages/fetchers/src/createStreamInDatabase.ts index 1315a25..fbaf483 100644 --- a/services/bot/src/fetchers/createStreamInDatabase.ts +++ b/packages/fetchers/src/createStreamInDatabase.ts @@ -1,4 +1,4 @@ -import { configs } from '../config.ts' +import { configs } from '../../../services/bot/src/config.ts' export default async function createStreamInDatabase(url: string, discordMessageId: string): Promise { const streamPayload = { diff --git a/services/factory/src/fetchers/createVod.ts b/packages/fetchers/src/createVod.ts similarity index 82% rename from services/factory/src/fetchers/createVod.ts rename to packages/fetchers/src/createVod.ts index 13f39df..c2132ad 100644 --- a/services/factory/src/fetchers/createVod.ts +++ b/packages/fetchers/src/createVod.ts @@ -1,7 +1,7 @@ -import { configs } from "../config" -import type { Vod, Stream } from '@futureporn/types' +import { configs } from "../../../services/factory/src/config" +import type { VodResponse, Stream } from '@futureporn/types' -export default async function createVod(stream?: Stream): Promise { +export default async function createVod(stream?: Stream): Promise { if (!stream) throw new Error(`first argument passed to createVod must be a {Stream}`); console.log(stream) const url = `${configs.postgrestUrl}/vods` @@ -26,7 +26,7 @@ export default async function createVod(stream?: Stream): Promise { const body = await res.json() throw new Error(`Problem during createVod. res.status=${res.status}, res.statusText=${res.statusText}, body=${JSON.stringify(body)}`) } - const json = await res.json() as Vod[] + const json = await res.json() as VodResponse[] const vod = json[0] if (!vod) return null else return vod diff --git a/services/bot/src/fetchers/findOrCreateStream.ts b/packages/fetchers/src/findOrCreateStream.ts similarity index 95% rename from services/bot/src/fetchers/findOrCreateStream.ts rename to packages/fetchers/src/findOrCreateStream.ts index ddf06ea..727a9ab 100644 --- a/services/bot/src/fetchers/findOrCreateStream.ts +++ b/packages/fetchers/src/findOrCreateStream.ts @@ -1,7 +1,7 @@ -import { configs } from "../config.ts" +import { configs } from "../../../services/bot/src/config.ts" import type { Stream, VodRecord } from "@futureporn/types" import { sub } from 'date-fns' -import { bot } from "../bot.ts" +import { bot } from "../../../services/bot/src/bot.ts" export async function findStream(vtuberId: string, lteDate: Date, gteDate: Date): Promise { const fetchUrl = `${configs.postgrestUrl}/streams?select=id&vtuber=eq.${vtuberId}&date=gte.${gteDate.toISOString()}&date=lte.${lteDate.toISOString()}` diff --git a/services/bot/src/fetchers/findOrCreateVtuber.spec.ts b/packages/fetchers/src/findOrCreateVtuber.spec.ts similarity index 100% rename from services/bot/src/fetchers/findOrCreateVtuber.spec.ts rename to packages/fetchers/src/findOrCreateVtuber.spec.ts diff --git a/services/bot/src/fetchers/findOrCreateVtuber.ts b/packages/fetchers/src/findOrCreateVtuber.ts similarity index 96% rename from services/bot/src/fetchers/findOrCreateVtuber.ts rename to packages/fetchers/src/findOrCreateVtuber.ts index f6f16f0..322c39b 100644 --- a/services/bot/src/fetchers/findOrCreateVtuber.ts +++ b/packages/fetchers/src/findOrCreateVtuber.ts @@ -1,6 +1,6 @@ -import { configs } from "../config.ts" +import { configs } from "../../../services/bot/src/config.ts" import type { VtuberRecord, VtuberResponse } from "@futureporn/types" -import { bot } from '../bot.ts' +import { bot } from '../../../services/bot/src/bot.ts' import qs from 'qs' interface vTuberSearchQuery { diff --git a/services/bot/src/fetchers/findVod.ts b/packages/fetchers/src/findVod.ts similarity index 78% rename from services/bot/src/fetchers/findVod.ts rename to packages/fetchers/src/findVod.ts index 98bee01..0b58b50 100644 --- a/services/bot/src/fetchers/findVod.ts +++ b/packages/fetchers/src/findVod.ts @@ -1,12 +1,11 @@ import type { VodResponse } from "@futureporn/types"; -import { bot } from "../bot.ts"; -import { configs } from "../config.ts"; +import { configs } from "./config.ts"; export default async function findVod({ vod_id, discord_message_id }: { vod_id?: string, discord_message_id?: string }): Promise { const fetchUrl = (!!vod_id) - ? `${configs.postgrestUrl}/vods?id=eq.${vod_id}&select=*,segments(bytes,updated_at,created_at)` + ? `${configs.postgrestUrl}/vods?id=eq.${vod_id}&select=*,segments(bytes,updated_at,created_at,s3_key)` : `${configs.postgrestUrl}/vods?discord_message_id=eq.${discord_message_id}&select=*,segments(bytes,updated_at,created_at)` - bot.logger.info(fetchUrl) + console.info(fetchUrl) const fetchOptions = { method: 'GET', headers: { @@ -18,11 +17,11 @@ export default async function findVod({ vod_id, discord_message_id }: { vod_id?: const res = await fetch(fetchUrl, fetchOptions) if (!res.ok) { const msg = `request failed. status=${res.status}, statusText=${res.statusText}` - bot.logger.error(msg) + console.error(msg) throw new Error(msg) } const json = await res.json() as VodResponse[] - // bot.logger.info(`vod results as follows.`) - // bot.logger.info(json) + // console.info(`vod results as follows.`) + // console.info(json) return json?.at(0) || null } \ No newline at end of file diff --git a/services/capture/src/fetchers/getPlaylistUrl.ts b/packages/fetchers/src/getPlaylistUrl.ts similarity index 90% rename from services/capture/src/fetchers/getPlaylistUrl.ts rename to packages/fetchers/src/getPlaylistUrl.ts index e6cfcd7..b653b98 100644 --- a/services/capture/src/fetchers/getPlaylistUrl.ts +++ b/packages/fetchers/src/getPlaylistUrl.ts @@ -1,4 +1,4 @@ -import { configs } from '../config.ts' +import { configs } from '../../../services/capture/src/config.ts' export default async function getPlaylistUrl (url: string): Promise { if (!url) throw new Error(`getPlaylistUrl requires a url, but it was undefined.`); diff --git a/services/capture/src/fetchers/getSegmentsFromDatabase.ts b/packages/fetchers/src/getSegmentsFromDatabase.ts similarity index 78% rename from services/capture/src/fetchers/getSegmentsFromDatabase.ts rename to packages/fetchers/src/getSegmentsFromDatabase.ts index 8a80177..94f8e7c 100644 --- a/services/capture/src/fetchers/getSegmentsFromDatabase.ts +++ b/packages/fetchers/src/getSegmentsFromDatabase.ts @@ -1,15 +1,13 @@ -import type { Segment } from '@futureporn/types' -import type { Helpers } from 'graphile-worker' -import { configs } from '../config.ts' +import { configs } from '../../../services/capture/src/config.ts' import querystring from 'node:querystring' -export default async function getSegmentsFromDatabase(s3_key: string, helpers: Helpers): Promise { +export default async function getSegmentsFromDatabase(s3_key: string): Promise { if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg'); const segmentPayload = { s3_key } - helpers.logger.info(`Creating segment with s3_key=${s3_key}. payload as follows`) - helpers.logger.info(JSON.stringify(segmentPayload)) + console.info(`Creating segment with s3_key=${s3_key}. payload as follows`) + console.info(JSON.stringify(segmentPayload)) const res = await fetch(`${configs.postgrestUrl}/segments`, { method: 'POST', headers: { @@ -23,7 +21,7 @@ export default async function getSegmentsFromDatabase(s3_key: string, helpers: H if (!res.ok) { const body = await res.text() const msg = `failed to create Segment. status=${res.status}, statusText=${res.statusText}, body=${body}` - helpers.logger.error(msg) + console.error(msg) throw new Error(msg); } const location = res.headers.get('location') diff --git a/services/bot/src/fetchers/getStreamFromDatabase.ts b/packages/fetchers/src/getStreamFromDatabase.ts similarity index 90% rename from services/bot/src/fetchers/getStreamFromDatabase.ts rename to packages/fetchers/src/getStreamFromDatabase.ts index 4604549..819d57d 100644 --- a/services/bot/src/fetchers/getStreamFromDatabase.ts +++ b/packages/fetchers/src/getStreamFromDatabase.ts @@ -1,6 +1,6 @@ -import { configs } from "../config.ts" +import { configs } from "../../../services/bot/src/config.ts" import type { Stream } from '@futureporn/types' -import { bot } from '../bot.ts' +import { bot } from '../../../services/bot/src/bot.ts' export default async function getStreamFromDatabase(messageId: string): Promise { diff --git a/services/bot/src/fetchers/getStreamIdFromMessage.ts b/packages/fetchers/src/getStreamIdFromMessage.ts similarity index 100% rename from services/bot/src/fetchers/getStreamIdFromMessage.ts rename to packages/fetchers/src/getStreamIdFromMessage.ts diff --git a/services/bot/src/fetchers/getUrlFromMessage.ts b/packages/fetchers/src/getUrlFromMessage.ts similarity index 94% rename from services/bot/src/fetchers/getUrlFromMessage.ts rename to packages/fetchers/src/getUrlFromMessage.ts index fe55078..cfb903e 100644 --- a/services/bot/src/fetchers/getUrlFromMessage.ts +++ b/packages/fetchers/src/getUrlFromMessage.ts @@ -1,5 +1,5 @@ import { type Interaction } from "discordeno" -import { configs } from "../config.ts" +import { configs } from "../../../services/bot/src/config.ts" import { type Stream } from "@futureporn/types" import { logger } from "@discordeno/bot" diff --git a/services/factory/src/fetchers/getVod.ts b/packages/fetchers/src/getVod.ts similarity index 62% rename from services/factory/src/fetchers/getVod.ts rename to packages/fetchers/src/getVod.ts index 818417b..c3caa00 100644 --- a/services/factory/src/fetchers/getVod.ts +++ b/packages/fetchers/src/getVod.ts @@ -1,9 +1,7 @@ -import { configs } from "../config" -import { Helpers } from "graphile-worker" -import { Stream } from "@futureporn/types" +import { configs } from "./config" import type { VodResponse } from "@futureporn/types" -export default async function getVod(vodId: string, helpers: Helpers) { +export default async function getVod(vodId: string) { const url = `${configs.postgrestUrl}/vods?select=*,segments(*)&id=eq.${vodId}` try { const res = await fetch(url) @@ -14,11 +12,11 @@ export default async function getVod(vodId: string, helpers: Helpers) { if (!body[0]) throw new Error('body[0] was expected to be Vod data, but it was either null or undefined.'); return body[0]; } catch (e) { - helpers.logger.error(`encountered an error during getVod()`) + console.error(`encountered an error during getVod()`) if (e instanceof Error) { - helpers.logger.error(e.message) + console.error(e.message) } else { - helpers.logger.error(JSON.stringify(e)) + console.error(JSON.stringify(e)) } return null } diff --git a/services/bot/src/fetchers/patchVod.ts b/packages/fetchers/src/patchVod.ts similarity index 92% rename from services/bot/src/fetchers/patchVod.ts rename to packages/fetchers/src/patchVod.ts index c5e6829..9e6dcdd 100644 --- a/services/bot/src/fetchers/patchVod.ts +++ b/packages/fetchers/src/patchVod.ts @@ -1,5 +1,5 @@ import type { Stream } from "@futureporn/types"; -import { configs } from "../config.ts"; +import { configs } from "../../../services/bot/src/config.ts"; import { logger } from "@discordeno/bot"; export default async function patchVod(stream_id: string, payload: Partial): Promise { diff --git a/services/factory/src/fetchers/patchVodInDatabase.ts b/packages/fetchers/src/patchVodInDatabase.ts similarity index 91% rename from services/factory/src/fetchers/patchVodInDatabase.ts rename to packages/fetchers/src/patchVodInDatabase.ts index 53e4fe8..7240da1 100644 --- a/services/factory/src/fetchers/patchVodInDatabase.ts +++ b/packages/fetchers/src/patchVodInDatabase.ts @@ -1,5 +1,5 @@ import type { VodRecord } from "@futureporn/types"; -import { configs } from "../config.ts"; +import { configs } from "../../../services/factory/src/config.ts"; export default async function patchVodInDatabase(vod_id: string, payload: Partial): Promise { diff --git a/services/capture/src/fetchers/updateSegmentInDatabase.ts b/packages/fetchers/src/updateSegmentInDatabase.ts similarity index 76% rename from services/capture/src/fetchers/updateSegmentInDatabase.ts rename to packages/fetchers/src/updateSegmentInDatabase.ts index bd8a730..84b5fa0 100644 --- a/services/capture/src/fetchers/updateSegmentInDatabase.ts +++ b/packages/fetchers/src/updateSegmentInDatabase.ts @@ -1,6 +1,5 @@ import type { SegmentResponse } from '@futureporn/types' -import type { Helpers } from 'graphile-worker' -import { configs } from '../config.ts' +import { configs } from '../../../services/capture/src/config.ts' /** @@ -13,11 +12,9 @@ import { configs } from '../config.ts' export default async function updateSegmentInDatabase({ segment_id, fileSize, - helpers }: { segment_id: string, fileSize: number, - helpers: Helpers }): Promise { const payload: any = { @@ -25,7 +22,7 @@ export default async function updateSegmentInDatabase({ } const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)` - // helpers.logger.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`) + console.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`) const res = await fetch(fetchUrl, { method: 'PATCH', headers: { @@ -39,14 +36,14 @@ export default async function updateSegmentInDatabase({ if (!res.ok) { const body = await res.text() const msg = `failed to updateSegmentInDatabase. status=${res.status}, statusText=${res.statusText}, body=${body}` - helpers.logger.error(msg) + console.error(msg) throw new Error(msg); } - // helpers.logger.info(`response was OK~`) + // console.info(`response was OK~`) const body = await res.json() as SegmentResponse[]; if (!body[0]) throw new Error(`failed to get a segment that matched segment_id=${segment_id}`); const bod = body[0] - // helpers.logger.info('the following was the response from PATCH-ing /segments') - // helpers.logger.info(JSON.stringify(bod)) + // console.info('the following was the response from PATCH-ing /segments') + // console.info(JSON.stringify(bod)) return bod } \ No newline at end of file diff --git a/packages/storage/package.json b/packages/storage/package.json index 68be6af..4f6ae1f 100644 --- a/packages/storage/package.json +++ b/packages/storage/package.json @@ -16,7 +16,7 @@ "author": "@CJ_Clippy", "license": "Unlicense", "dependencies": { - "@aws-sdk/client-s3": "^3.637.0", + "@aws-sdk/client-s3": "3.637.0", "@aws-sdk/lib-storage": "^3.637.0", "@futureporn/types": "workspace:^", "@paralleldrive/cuid2": "^2.2.2", diff --git a/packages/storage/pnpm-lock.yaml b/packages/storage/pnpm-lock.yaml index 186ad83..1e264c2 100644 --- a/packages/storage/pnpm-lock.yaml +++ b/packages/storage/pnpm-lock.yaml @@ -9,7 +9,7 @@ importers: .: dependencies: '@aws-sdk/client-s3': - specifier: ^3.637.0 + specifier: 3.637.0 version: 3.637.0 '@aws-sdk/lib-storage': specifier: ^3.637.0 diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index d5dac71..e845819 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -144,12 +144,12 @@ export interface VodResponse { title?: string; date: string; mux_asset: MuxAssetRecord; - thumbnail?: S3File; + thumbnail?: S3FileResponse; vtuber: VtuberRecord; tags?: TagRecord[]; timestamps?: Timestamp[]; ipfs_cid?: string; - s3_file?: S3File; + s3_file?: S3FileResponse; torrent?: string; announce_title?: string; announce_url?: string; @@ -174,7 +174,7 @@ export interface Stream { updated_at: Date; vtuber: string; tweet: string; - vods?: Vod[]; + vods?: VodResponse[]; archive_status: ArchiveStatus; is_chaturbate_stream: Boolean; is_fansly_stream: Boolean; diff --git a/packages/utils/src/name.ts b/packages/utils/src/name.ts index 815b6ef..6ec458d 100644 --- a/packages/utils/src/name.ts +++ b/packages/utils/src/name.ts @@ -7,4 +7,6 @@ export function fpSlugify(str: string): string { locale: 'en', trim: true, }); -} \ No newline at end of file +} + +export const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0' \ No newline at end of file diff --git a/services/bot/crontab b/services/bot/crontab index 6a5d8ce..5b3b3c6 100644 --- a/services/bot/crontab +++ b/services/bot/crontab @@ -15,4 +15,4 @@ ## every 1 minutes, we see which /vods are stale and we mark them as such. ## this prevents stalled Record updates by marking stalled recordings as stopped -* * * * * update_vod_statuses ?max=1 { stalled_minutes:1, finished_minutes:2 } \ No newline at end of file +* * * * * update_vod_statuses ?max=1 { stalled_minutes:2, finished_minutes:3 } \ No newline at end of file diff --git a/services/bot/package.json b/services/bot/package.json index b6f8a12..c811eb5 100644 --- a/services/bot/package.json +++ b/services/bot/package.json @@ -25,6 +25,7 @@ "dependencies": { "@discordeno/bot": "19.0.0-next.b1bfe94", "@discordeno/rest": "19.0.0-next.b3a8c86", + "@futureporn/fetchers": "workspace:^", "@paralleldrive/cuid2": "^2.2.2", "@types/node": "^22.5.2", "@types/qs": "^6.9.15", diff --git a/services/bot/pnpm-lock.yaml b/services/bot/pnpm-lock.yaml index 4ea9f0d..fb88465 100644 --- a/services/bot/pnpm-lock.yaml +++ b/services/bot/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@discordeno/rest': specifier: 19.0.0-next.b3a8c86 version: 19.0.0-next.b3a8c86 + '@futureporn/fetchers': + specifier: workspace:^ + version: link:../../packages/fetchers '@paralleldrive/cuid2': specifier: ^2.2.2 version: 2.2.2 diff --git a/services/bot/src/commands/process.ts b/services/bot/src/commands/process.ts index f7b69b2..6f16237 100644 --- a/services/bot/src/commands/process.ts +++ b/services/bot/src/commands/process.ts @@ -1,11 +1,11 @@ import { ApplicationCommandTypes, logger, type Interaction } from '@discordeno/bot' import { createCommand } from '../commands.ts' -import getStreamFromDatabase from '../fetchers/getStreamFromDatabase.ts' -import patchVod from '../fetchers/patchVod.ts' +import getStreamFromDatabase from '@futureporn/fetchers/getStreamFromDatabase.ts' +import patchVod from '@futureporn/fetchers/patchVod.ts' import { quickAddJob, type WorkerUtilsOptions } from 'graphile-worker' import { configs } from '../config.ts' -import findVod from '../fetchers/findVod.ts' +import findVod from '@futureporn/fetchers/findVod.ts' function throwErr(msg: string) { logger.error(msg) diff --git a/services/bot/src/commands/record.ts b/services/bot/src/commands/record.ts index b07e713..c5f54df 100644 --- a/services/bot/src/commands/record.ts +++ b/services/bot/src/commands/record.ts @@ -11,10 +11,10 @@ import { rbacAllow } from '../middlewares/rbac.ts' import { createCommand } from '../commands.ts' import { configs } from '../config.ts' import type { Stream } from '@futureporn/types' -import createStreamInDatabase from '../fetchers/createStreamInDatabase.ts' -import createVod from '../fetchers/createVod.ts' -import findOrCreateVtuber from '../fetchers/findOrCreateVtuber.ts' -import findOrCreateStream from '../fetchers/findOrCreateStream.ts' +import createStreamInDatabase from '@futureporn/fetchers/createStreamInDatabase.ts' +import createVod from '@futureporn/fetchers/createVod.ts' +import findOrCreateVtuber from '@futureporn/fetchers/findOrCreateVtuber.ts' +import findOrCreateStream from '@futureporn/fetchers/findOrCreateStream.ts' /** diff --git a/services/bot/src/fetchers/createVod.ts b/services/bot/src/fetchers/createVod.ts deleted file mode 100644 index 232e015..0000000 --- a/services/bot/src/fetchers/createVod.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { configs } from "../config.ts" -import type { VodRecord } from "@futureporn/types" - -export default async function createVod(payload: Partial): Promise { - const vodPayload = { - date: new Date().toISOString() - } - const res = await fetch(`${configs.postgrestUrl}/vods`, { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - 'Prefer': 'return=headers-only', - 'Authorization': `Bearer ${configs.automationUserJwt}`, - }, - body: JSON.stringify(Object.assign(vodPayload, payload)) - }) - if (!res.ok) { - const status = res.status - const statusText = res.statusText - const body = await res.text() - const msg = `Failed to create vod in database. status=${status}, statusText=${statusText}, body=${body}` - console.error(msg) - throw new Error(msg) - } - const id = res.headers.get('location')?.split('.').at(-1) - if (!id) throw new Error('id could not be parsed from location header'); - return id -} \ No newline at end of file diff --git a/services/bot/src/tasks/update_discord_message.ts b/services/bot/src/tasks/update_discord_message.ts index 199228e..900d0ae 100644 --- a/services/bot/src/tasks/update_discord_message.ts +++ b/services/bot/src/tasks/update_discord_message.ts @@ -1,7 +1,7 @@ import 'dotenv/config' -import type { Status, Stream, SegmentResponse, VodRecord, VodResponse } from '@futureporn/types' +import type { Status, SegmentResponse, VodResponse } from '@futureporn/types' import { type Task, type Helpers } from 'graphile-worker' -import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns' +import { intervalToDuration, formatDuration } from 'date-fns' import prettyBytes from 'pretty-bytes' import { EmbedsBuilder, @@ -13,7 +13,7 @@ import { } from '@discordeno/bot' import { bot } from '../bot.ts' import { configs } from '../config.ts' -import findVod from '../fetchers/findVod.ts' +import findVod from '@futureporn/fetchers/findVod.ts' const yeahEmojiId = BigInt('1253191939461873756') @@ -89,6 +89,7 @@ function getEmbeds(vod: VodResponse, helpers: Helpers) { { name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true }, // { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo { name: 'URL', value: url, inline: false }, + { name: 'CDN2', value: '@todo', inline: false }, ]) if (status === 'pending_recording') { embeds @@ -132,12 +133,14 @@ function getEmbeds(vod: VodResponse, helpers: Helpers) { .setFields(segments.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()).map((s, i) => ( { name: `Segment ${i+1}`, - value: `${getDuration(s)} (${prettyBytes(s.bytes)})`, + value: `${getDuration(s)} (${prettyBytes(s.bytes)}) (${s.s3_key})`, inline: false } ))) } + // Add an embed for S3 files + // Add an Embed for processing tasks if (status === 'processing') { const tasks = [ diff --git a/services/bot/src/tasks/update_vod_statuses.test.ts b/services/bot/src/tasks/update_vod_statuses.test.ts new file mode 100644 index 0000000..4dc70c2 --- /dev/null +++ b/services/bot/src/tasks/update_vod_statuses.test.ts @@ -0,0 +1,11 @@ +import { updateStalledVods } from "./update_vod_statuses.ts" + +describe('update_vod_statuses', function () { + describe('updateStalledVods', function () { + describe('integration', function () { + xit("Should fetch a list of VODs that haven't been updated in the past 2 minutes", function () { + // const + }) + }) + }) +}) \ No newline at end of file diff --git a/services/bot/src/tasks/update_vod_statuses.ts b/services/bot/src/tasks/update_vod_statuses.ts index 2c44d88..f8022a9 100644 --- a/services/bot/src/tasks/update_vod_statuses.ts +++ b/services/bot/src/tasks/update_vod_statuses.ts @@ -18,7 +18,7 @@ function assertPayload(payload: any): asserts payload is Payload { if (typeof payload.finished_minutes !== 'number') throw new Error(`finished_minutes parameter was not a number`); } -async function updateFinishedVods({ +export async function updateFinishedVods({ helpers, finished_minutes, url @@ -60,21 +60,22 @@ async function updateFinishedVods({ } } -async function updateStalledVods({ +export async function updateStalledVods({ helpers, url, stalled_minutes = 1, }: { helpers: Helpers, url: string, - stalled_minutes?: number, + stalled_minutes: number, }) { // Identify and update stalled vods + // note: we are checking /vods update_at which gets updated whenever a /segments gets updated (thanks to postgres trigger functions) const stalledTimestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString(); const stalledQueryOptions = { - select: 'status,id,segments!inner(updated_at)', - 'segments.updated_at': `lt.${stalledTimestamp}`, - or: '(status.eq.pending_recording,status.eq.recording)', + 'select': 'status,id', + 'updated_at': `lt.${stalledTimestamp}`, + 'or': '(status.eq.pending_recording,status.eq.recording)', }; const stalledUpdatePayload = { status: 'stalled', @@ -100,6 +101,9 @@ async function updateStalledVods({ helpers.logger.error(stalledBody); return; } + // const stalledResHeaders = stalledRes.headers.get('') + // console.log(stalledRes) + // console.log(stalledResBody) } export const update_vod_statuses: Task = async function (payload: unknown, helpers: Helpers) { diff --git a/services/bot/tsconfig.json b/services/bot/tsconfig.json index 37be190..fbafd15 100644 --- a/services/bot/tsconfig.json +++ b/services/bot/tsconfig.json @@ -25,7 +25,7 @@ // Include the necessary files for your project "include": [ "**/*.ts" -, "src/events/interactionCreate.ts.old" ], +, "src/events/interactionCreate.ts.old", "../../packages/fetchers/createStreamInDatabase.ts", "../../packages/fetchers/createVod.ts", "../../packages/fetchers/findOrCreateStream.ts", "../../packages/fetchers/findOrCreateVtuber.spec.ts", "../../packages/fetchers/findOrCreateVtuber.ts", "../../packages/fetchers/findVod.ts", "../../packages/fetchers/getStreamFromDatabase.ts", "../../packages/fetchers/getStreamIdFromMessage.ts", "../../packages/fetchers/getUrlFromMessage.ts", "../../packages/fetchers/patchVod.ts" ], "exclude": [ "node_modules" ] diff --git a/services/capture/package.json b/services/capture/package.json index 7dd9322..df096d4 100644 --- a/services/capture/package.json +++ b/services/capture/package.json @@ -4,7 +4,7 @@ "version": "0.3.5", "license": "Unlicense", "private": true, - "packageManager": "pnpm@9.5.0", + "packageManager": "pnpm@9.6.0", "scripts": { "start": "node dist/index.js", "build": "tsup", @@ -21,6 +21,7 @@ "@aws-sdk/client-s3": "^3.637.0", "@aws-sdk/lib-storage": "^3.637.0", "@aws-sdk/types": "^3.609.0", + "@futureporn/fetchers": "workspace:^", "@futureporn/types": "workspace:^", "@futureporn/utils": "workspace:^", "@paralleldrive/cuid2": "^2.2.2", @@ -46,6 +47,7 @@ "https": "^1.0.0", "ioredis": "^5.4.1", "minimatch": "^10.0.1", + "nanoid": "^5.0.7", "p-retry": "^6.2.0", "pg-boss": "^10.1.1", "pino-pretty": "^11.2.2", diff --git a/services/capture/pnpm-lock.yaml b/services/capture/pnpm-lock.yaml index a12646e..8ca66dc 100644 --- a/services/capture/pnpm-lock.yaml +++ b/services/capture/pnpm-lock.yaml @@ -17,6 +17,9 @@ importers: '@aws-sdk/types': specifier: ^3.609.0 version: 3.609.0 + '@futureporn/fetchers': + specifier: workspace:^ + version: link:../../packages/fetchers '@futureporn/types': specifier: workspace:^ version: link:../../packages/types @@ -92,6 +95,9 @@ importers: minimatch: specifier: ^10.0.1 version: 10.0.1 + nanoid: + specifier: ^5.0.7 + version: 5.0.7 p-retry: specifier: ^6.2.0 version: 6.2.0 @@ -1951,6 +1957,11 @@ packages: nan@2.20.0: resolution: {integrity: sha512-bk3gXBZDGILuuo/6sKtr0DQmSThYHLtNCdSdXk9YkxD/jK6X2vmCyyXBBxyqZ4XcnzTyYEAThfX3DCEnLf6igw==} + nanoid@5.0.7: + resolution: {integrity: sha512-oLxFY2gd2IqnjcYyOXD8XGCftpGtZP2AbHbOkthDkvRywH5ayNtPVy9YlOPcHckXzbLTCHpkb7FB+yuxKV13pQ==} + engines: {node: ^18 || >=20} + hasBin: true + neotraverse@0.6.18: resolution: {integrity: sha512-Z4SmBUweYa09+o6pG+eASabEpP6QkQ70yHj351pQoEXIs8uHbaU2DWVmzBANKgflPa47A50PtB2+NgRpQvr7vA==} engines: {node: '>= 10'} @@ -4996,6 +5007,8 @@ snapshots: nan@2.20.0: {} + nanoid@5.0.7: {} + neotraverse@0.6.18: {} nise@5.1.9: diff --git a/services/capture/src/Record.ts b/services/capture/src/Record.ts index 77d826e..b6df091 100644 --- a/services/capture/src/Record.ts +++ b/services/capture/src/Record.ts @@ -1,10 +1,15 @@ import { spawn } from 'child_process'; -import { EventEmitter, PassThrough, pipeline, Readable } from 'stream'; -import prettyBytes from 'pretty-bytes'; +import { PassThrough, pipeline, Readable } from 'stream'; +import EventEmitter from 'events'; import { Upload } from "@aws-sdk/lib-storage"; -import { S3Client } from "@aws-sdk/client-s3"; +import { CompleteMultipartUploadCommand, S3Client } from "@aws-sdk/client-s3"; import 'dotenv/config' +import { promisify } from 'util'; +// @see https://nodejs.org/api/events.html#capture-rejections-of-promises +EventEmitter.captureRejections = true; + +const pipelinePromise = promisify(pipeline) const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0' export class UploadStreamClosedError extends Error { @@ -49,6 +54,7 @@ export default class Record { date?: string; abortSignal: AbortSignal; onProgress: Function; + upload?: Upload; constructor({ inputStream, s3Client, bucket, jobId, abortSignal, onProgress }: RecordArgs) { if (!inputStream) throw new Error('Record constructor was missing inputStream.'); @@ -67,6 +73,7 @@ export default class Record { this.uploadStream = new PassThrough() this.abortSignal = abortSignal this.abortSignal.addEventListener("abort", this.abortEventListener.bind(this)) + this.upload } @@ -127,7 +134,7 @@ export default class Record { // greets https://stackoverflow.com/a/70159394/1004931 try { - const parallelUploads3 = new Upload({ + this.upload = new Upload({ client: this.s3Client, partSize: 1024 * 1024 * 5, queueSize: 1, @@ -135,8 +142,7 @@ export default class Record { params: target, }); - - parallelUploads3.on("httpUploadProgress", (progress) => { + this.upload.on("httpUploadProgress", (progress) => { if (progress?.loaded) { // console.log(progress) if (this.onProgress) this.onProgress(this.counter); @@ -146,15 +152,41 @@ export default class Record { } }); - console.log('Waiting for parallelUploads3 to finish...') - await parallelUploads3.done(); - console.log('parallelUploads3 is complete.') + console.log(`Uploading to bucket=${this.bucket}. Waiting for parallelUploads3 to finish...`) + // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA. + // when the download stream closes before the upload stream, I think the upload stream gets cut off. + // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3. + // is that right? IDK what's happening, but we don't get any segment data on S3 at all?? + // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but + // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far. + // So I think what is happening is the upload is happening, but it's not finishing. + // It looks like the finish is only allowed to happen under completely normal circumstances. + // However, the segment upload may fail in production, and we need to let the upload finish even then. + // + // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput. + // But because of the catch, that promise will never resolve? + // What happens to an in-progress Promise when an error is thrown? + + // await this.upload.done(); + // console.log('Upload is complete.') + // return this.uploadStream + } catch (e) { // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error. // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted. // in other words, `(e.name === 'AbortError')` will never be true. - if (this.abortSignal.aborted) return; + if (this.abortSignal.aborted) { + console.error('While uploading, the upload was aborted.') + setTimeout(async () => { + await this.upload?.abort() + }, 1000) + // if (this.upload) { + // const command = new CompleteMultipartUploadCommand() + // } + return; + } if (e instanceof Error) { console.error(`We were uploading a file to S3 but then we encountered an exception!`) @@ -191,10 +223,6 @@ export default class Record { console.error('there was an error on the uploadStream. error as follows') console.error(e) }) - // T.M.I. - // this.uploadStream.on('drain', () => { - // console.info('[vvv] drain on uploadStream.') - // }) // input stream event handlers this.inputStream.on('close', () => { @@ -211,22 +239,20 @@ export default class Record { // pipe the ffmpeg stream to the S3 upload stream // this has the effect of uploading the stream to S3 at the same time we're recording it. - pipeline( + console.log(`>>> awaiting pipeline promise`) + const streamPromise = pipelinePromise( this.inputStream, - this.uploadStream, - (err) => { - if (err) { - console.error(`pipeline errored.`) - console.error(err) - } else { - console.log('pipeline succeeded.') - } - } + this.uploadStream ) - console.log('awaiting uploadToS3()...') - await this.uploadToS3() - console.log('uploadToS3() is complete.') + this.uploadToS3() + await Promise.all([streamPromise, this.upload?.done()]) + + // console.log('awaiting uploadToS3()...') + // await this.uploadToS3() + // console.log('uploadToS3() is complete.') + + console.log(`streamPromise completed with jobId=${this.jobId}, keyName=${this.keyName}`) return { jobId: this.jobId, diff --git a/services/capture/src/RecordNextGeneration.spec.ts b/services/capture/src/RecordNextGeneration.spec.ts new file mode 100644 index 0000000..8aed063 --- /dev/null +++ b/services/capture/src/RecordNextGeneration.spec.ts @@ -0,0 +1,15 @@ +import RecordNextGeneration from './RecordNextGeneration.ts' +import getVod from '@futureporn/fetchers/getVod.ts' +import createVod from '@futureporn/fetchers/createVod.ts' + +describe('RecordNextGeneration', function () { + describe('integration', function () { + it('should stream to S3', async function () { + this.timeout(30000) + const cv = await createVod() + if (!cv) throw new Error('failed to get vod from createVod()'); + const recordNG = new RecordNextGeneration({ url: 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-06-15.mp4', vodId: cv.id }) + const uv = await getVod(cv.id) + }) + }) +}) \ No newline at end of file diff --git a/services/capture/src/RecordNextGeneration.ts b/services/capture/src/RecordNextGeneration.ts new file mode 100644 index 0000000..b1225c5 --- /dev/null +++ b/services/capture/src/RecordNextGeneration.ts @@ -0,0 +1,213 @@ +import { VodResponse } from "@futureporn/types" +import getVod from '@futureporn/fetchers/getVod.ts' +import { PassThrough, Readable } from "stream" +import { ua0 } from '@futureporn/utils/name.ts' +import { spawn } from 'child_process' +import { pipeline } from "stream/promises" +import { configs } from "./config" +import { nanoid } from 'nanoid' +import { Upload } from "@aws-sdk/lib-storage" +import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3' + +export interface RecordNextGenerationArguments { + vodId: string; + url: string; +} + + +export default class RecordNextGeneration { + + public vodId: string; + public url: string; + private vod?: VodResponse|null; + private downloadStream?: Readable; + private uploadStream?: PassThrough; + private upload?: Upload; + private streamPipeline?: Promise; + + constructor({ vodId, url }: RecordNextGenerationArguments) { + this.vodId = vodId + this.url = url + + + + // const outputStream = createWriteStream('/dev/null') + // setInterval(() => { inputStream.push('simulated downloader bytes received') }, 50) + // setTimeout(() => { inputStream.destroy() }) + } + + static getMultipartUpload({ client, bucket, key, body }: { client: S3Client, bucket: string, key: string, body: Readable }) { + const params = { + Bucket: bucket, + Key: key, + Body: body + } + const upload = new Upload({ + client, + partSize: 1024 * 1024 * 5, + queueSize: 1, + leavePartsOnError: true, + params + }) + + upload.on("httpUploadProgress", (progress) => { + if (progress?.loaded) { + console.log(progress) + // if (this.onProgress) this.onProgress(this.counter); + // console.log(`uploaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`); + } else { + console.log(`httpUploadProgress ${JSON.stringify(progress, null, 2)}`) + } + }); + + return upload + } + + // static deleteme () { + + // // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA. + // // when the download stream closes before the upload stream, I think the upload stream gets cut off. + // // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3. + // // is that right? IDK what's happening, but we don't get any segment data on S3 at all?? + // // Ok I just checked the Backblaze dashboard and we are uploading. Backblaze says the bytes are at 0 but + // // it also shows a partial upload of 550MB which matches what capture-worker is showing has been captured so far. + // // So I think what is happening is the upload is happening, but it's not finishing. + // // It looks like the finish is only allowed to happen under completely normal circumstances. + // // However, the segment upload may fail in production, and we need to let the upload finish even then. + // // + // // I think I need to call CompleteMultipartUpload. https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html + // // Yes, that's right. Apparently parallelUploads3.done() returns a Promise which will resolve to CompleteMultipartUploadCommandOutput. + // // But because of the catch, that promise will never resolve? + // // What happens to an in-progress Promise when an error is thrown? + + // // await this.upload.done(); + // // console.log('Upload is complete.') + // // return this.uploadStream + + + // } catch (e) { + // // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error. + // // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted. + // // in other words, `(e.name === 'AbortError')` will never be true. + // if (this.abortSignal.aborted) { + // console.error('While uploading, the upload was aborted.') + // setTimeout(async () => { + // await this.upload?.abort() + // }, 1000) + // // if (this.upload) { + // // const command = new CompleteMultipartUploadCommand() + // // } + // return; + // } + + // if (e instanceof Error) { + // console.error(`We were uploading a file to S3 but then we encountered an exception!`) + // console.error(e) + // throw e + // } else { + // throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`) + // } + // } + + // } + + + static getFFmpegStream({ url }: { url: string }): Readable { + console.log(`getFFmpegStream using url=${url}`) + const ffmpegProc = spawn('ffmpeg', [ + '-headers', `"User-Agent: ${ua0}"`, + '-i', url, + '-c:v', 'copy', + '-c:a', 'copy', + '-movflags', 'faststart', + '-y', + '-f', 'mpegts', + '-loglevel', 'quiet', + 'pipe:1' + ], { + // ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang + stdio: ['pipe', 'pipe', 'ignore'] + }) + return ffmpegProc.stdout + } + + + /** + * done() waits for the recording to be complete. + */ + async done() { + + // @todo [x] get the vod + // @todo [ ] get the download stream + // @todo [ ] pipe the download stream to the upload stream + // @todo [ ] start the upload(?) + // @todo [ ] await all the promised resolutions + // @todo [ ] handle errors + // @todo [ ] cleanup + // @todo - [ ] send S3 complete upload command if necessary + + try { + this.vod = await getVod(this.vodId) + if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`) + if (this.vod.is_recording_aborted) { + console.info(`vod ${this.vodId} is aborted.`) + return + } + + const clientOptions: S3ClientConfig = { + endpoint: configs.s3Endpoint, + region: configs.s3Region, + credentials: { + accessKeyId: configs.s3AccessKeyId, + secretAccessKey: configs.s3SecretAccessKey, + }, + } + const client = new S3Client(clientOptions) + const bucket = configs.s3UscBucket + const key = `@todo-${nanoid()}` + this.downloadStream = RecordNextGeneration.getFFmpegStream({ url: this.url }) + this.uploadStream = new PassThrough() + this.upload = RecordNextGeneration.getMultipartUpload({ client, bucket: configs.s3UscBucket, key, body: this.downloadStream }) + + this.streamPipeline = pipeline(this.downloadStream, this.uploadStream) + + return Promise.all([ + this.streamPipeline, + this.upload.done() + ]) + + } catch (e) { + + switch (e) { + + + case (e instanceof Error && e.name === 'RoomOfflineError'): + // if the room is offline, we re-throw the RoomOfflineError so the recording gets retried + // we do this because the offline might be a temporary situation. + // e.g. streamer's computer bluescreened and they're coming back after they reboot. + throw e + + + case (e instanceof Error && e.name === 'AbortError'): + // An admin aborted which means we don't want to retry. + // we return and the Task gets marked as successful. + return + + + default: + console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`) + console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`) + console.error(`!!!!!!!!!!!!!! switch/case defaulted which should probably never happen. Please patch the code to handle this scenario.`) + console.error((e instanceof Error) ? e.message : JSON.stringify(e)) + } + + + + } finally { + // @todo cleanup + console.info(`finally block is running now.`) + console.info('@todo cleanup') + } + } + +} \ No newline at end of file diff --git a/services/capture/src/app.spec.ts b/services/capture/src/app.spec.ts index ba6be8a..2efd5f5 100644 --- a/services/capture/src/app.spec.ts +++ b/services/capture/src/app.spec.ts @@ -1,11 +1,9 @@ 'use strict' import { build } from './app.ts' -import chai, { expect } from "chai" +import { use, expect } from "chai" import sinonChai from 'sinon-chai' -import sinon from 'sinon' -import { makeWorkerUtils } from 'graphile-worker' -chai.use(sinonChai) +use(sinonChai) describe('app', function () { const app = build({}, 'postgres://') diff --git a/services/capture/src/config.ts b/services/capture/src/config.ts index d522961..e4426ec 100644 --- a/services/capture/src/config.ts +++ b/services/capture/src/config.ts @@ -6,7 +6,7 @@ const requiredEnvVars = [ 'S3_SECRET_ACCESS_KEY', 'S3_REGION', 'S3_ENDPOINT', - 'S3_BUCKET', + 'S3_USC_BUCKET', 'POSTGREST_URL', 'AUTOMATION_USER_JWT', ] as const; @@ -26,7 +26,7 @@ export interface Config { s3AccessKeyId: string; s3SecretAccessKey: string; s3Region: string; - s3Bucket: string; + s3UscBucket: string; s3Endpoint: string; } @@ -38,6 +38,6 @@ export const configs: Config = { s3AccessKeyId: getEnvVar('S3_ACCESS_KEY_ID'), s3SecretAccessKey: getEnvVar('S3_SECRET_ACCESS_KEY'), s3Region: getEnvVar('S3_REGION'), - s3Bucket: getEnvVar('S3_BUCKET'), + s3UscBucket: getEnvVar('S3_USC_BUCKET'), s3Endpoint: getEnvVar('S3_ENDPOINT'), } \ No newline at end of file diff --git a/services/capture/src/poc.ts b/services/capture/src/poc.ts index 6ff5a0a..e59242f 100644 --- a/services/capture/src/poc.ts +++ b/services/capture/src/poc.ts @@ -49,7 +49,6 @@ async function main() { const playlistUrl: string = await new Promise((resolve, reject) => { - // get the m3u8 playlist for the livestream const ytdlp = spawn('yt-dlp', [ '-g', randomRoom.url ]) let output = '' diff --git a/services/capture/src/tasks/record.ts b/services/capture/src/tasks/record.ts index f00d002..1fdb199 100644 --- a/services/capture/src/tasks/record.ts +++ b/services/capture/src/tasks/record.ts @@ -1,15 +1,16 @@ -import updateSegmentInDatabase from '../fetchers/updateSegmentInDatabase.ts' +import updateSegmentInDatabase from '@futureporn/fetchers/updateSegmentInDatabase.ts' import { Helpers, type Task } from 'graphile-worker' import Record from '../Record.ts' -import type { SegmentResponse, ScoutResponse } from '@futureporn/types' +import type { SegmentResponse } from '@futureporn/types' import { configs } from '../config.ts' import { createId } from '@paralleldrive/cuid2' -import createSegmentInDatabase from '../fetchers/createSegmentInDatabase.ts' -import createSegmentsVodLink from '../fetchers/createSegmentsVodLink.ts' -import getPlaylistUrl from '../fetchers/getPlaylistUrl.ts' -import { String } from 'aws-sdk/clients/acm' +import createSegmentInDatabase from '@futureporn/fetchers/createSegmentInDatabase.ts' +import createSegmentsVodLink from '@futureporn/fetchers/createSegmentsVodLink.ts' +import getPlaylistUrl from '@futureporn/fetchers/getPlaylistUrl.ts' +import getVod from '@futureporn/fetchers/getVod.ts' +import RecordNextGeneration from '../RecordNextGeneration.ts' /** * url is the URL to be recorded. Ex: chaturbate.com/projektmelody @@ -30,40 +31,39 @@ function assertPayload(payload: any): asserts payload is Payload { } -async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) { - helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`) - const abortController = new AbortController() - const abortSignal = abortController.signal - const accessKeyId = configs.s3AccessKeyId; - const secretAccessKey = configs.s3SecretAccessKey; - const region = configs.s3Region; - const endpoint = configs.s3Endpoint; - const bucket = configs.s3Bucket; - const playlistUrl = await getPlaylistUrl(url) - if (!playlistUrl) throw new Error('failed to getPlaylistUrl'); - helpers.logger.info(`playlistUrl=${playlistUrl}`) - const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) - const inputStream = Record.getFFmpegStream({ url: playlistUrl }) - const onProgress = (fileSize: number) => { - updateSegmentInDatabase({ segment_id, fileSize, helpers }) - .then(checkIfAborted) - .then((isAborted) => { - isAborted ? abortController.abort() : null - }) - .catch((e) => { - helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance') - helpers.logger.error(e) - }) - } - const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal }) - return record -} +// async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) { +// helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`) +// const abortController = new AbortController() +// const abortSignal = abortController.signal +// const accessKeyId = configs.s3AccessKeyId; +// const secretAccessKey = configs.s3SecretAccessKey; +// const region = configs.s3Region; +// const endpoint = configs.s3Endpoint; +// const bucket = configs.s3UscBucket; +// const playlistUrl = await getPlaylistUrl(url) +// if (!playlistUrl) throw new Error('failed to getPlaylistUrl'); +// helpers.logger.info(`playlistUrl=${playlistUrl}`) +// const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) +// const inputStream = Record.getFFmpegStream({ url: playlistUrl }) +// const onProgress = (fileSize: number) => { +// updateSegmentInDatabase({ segment_id, fileSize, helpers }) +// .then(checkIfAborted) +// .then((isAborted) => { +// isAborted ? abortController.abort() : null +// }) +// .catch((e) => { +// helpers.logger.error('caught error while updatingDatabaseRecord inside onProgress inside getRecordInstance') +// helpers.logger.error(e) +// }) +// } -function checkIfAborted(segment: Partial): boolean { - // console.log(`checkIfAborted with following segment`) - // console.log(segment) - return (!!segment?.vod?.is_recording_aborted) -} +// const record = new Record({ inputStream, onProgress, bucket, s3Client, jobId: ''+segment_id, abortSignal }) +// return record +// } + +// function checkIfAborted(segment: Partial): boolean { +// return (!!segment?.vod?.is_recording_aborted) +// } @@ -79,16 +79,19 @@ function checkIfAborted(segment: Partial): boolean { * * This function also names the S3 file (s3_key) with a datestamp and a cuid. */ -const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) { - const s3_key = `${new Date().toISOString()}-${createId()}.ts` - helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`) - const segment_id = await createSegmentInDatabase(s3_key, vod_id, helpers) - helpers.logger.info(`let's create a segmentsStreamLink...`) - const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id, helpers) - helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`) - const record = await getRecordInstance(url, segment_id, helpers) - return record.start() -} +// const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) { +// const s3_key = `${new Date().toISOString()}-${createId()}.ts` +// helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`) +// const segment_id = await createSegmentInDatabase(s3_key, vod_id) +// helpers.logger.info(`let's create a segmentsStreamLink...`) +// const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id) +// helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`) +// // no try-catch block here, because we need any errors to bubble up. +// const record = await getRecordInstance(url, segment_id) +// helpers.logger.info(`we got a Record instance. now we record.start()`) +// // console.log(record) +// return record.start() +// } @@ -98,42 +101,60 @@ export const record: Task = async function (payload: unknown, helpers: Helpers) assertPayload(payload) const { url, vod_id } = payload const vodId = vod_id - try { - /** - * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay. - * This will take effect only when Record() throws an error. - * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry. - * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops. - * - * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case. - */ - // await backOff(() => doRecordSegment(url, recordId, helpers)) - await doRecordSegment(url, vodId, helpers) - } catch (e) { - // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' }) - helpers.logger.error(`caught an error during record Task`) - if (e instanceof Error) { - helpers.logger.info(`error.name=${e.name}`) - if (e.name === 'RoomOfflineError') { - // If room is offline, we want to retry until graphile-worker retries expire. - // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error - // graphile-worker will retry when we re-throw the error below. - helpers.logger.info(`Room is offline.`) - } else if (e.name === 'AbortError') { - // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job. - // We swallow the error and return in order to mark the job as succeeded. - helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`) - return - } else { - helpers.logger.error(e.message) - } - } else { - helpers.logger.error(JSON.stringify(e)) - } - // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job. - helpers.logger.error(`we got an error during record Task so we throw and retry`) - throw e - } + + /** + * RecordNextGeneration handles errors for us and re-throws ones that should fail the Task. + * We intentionally do not use a try/catch block here. + */ + const recordNG = new RecordNextGeneration({ url, vodId }) + await recordNG.done() + + + + return; + + + // try { + // // if the VOD has been aborted, end Task with success + // if ((await getVod(vod_id, helpers))?.is_recording_aborted) return; + + // /** + // * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay. + // * This will take effect only when Record() throws an error. + // * If however Record() returns, as is the case when the stream ends, this backoff timer will not retry. + // * This does not handle the corner case where the streamer's internet temporarliy goes down, and their stream drops. + // * + // * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case. + // */ + // // await backOff(() => doRecordSegment(url, recordId, helpers)) + // await doRecordSegment(url, vodId, helpers) + // } catch (e) { + // // await updateDatabaseRecord({ recordId: vod_id, recordingState: 'failed' }) + // helpers.logger.error(`caught an error during record Task`) + // if (e instanceof Error) { + // helpers.logger.info(`error.name=${e.name}`) + // if (e.name === 'RoomOfflineError') { + // // If room is offline, we want to retry until graphile-worker retries expire. + // // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error + // // graphile-worker will retry when we re-throw the error below. + // helpers.logger.info(`Room is offline.`) + // } else if (e.name === 'AbortError') { + // // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job. + // // We swallow the error and return in order to mark the job as succeeded. + // helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`) + // return + // } else { + // helpers.logger.error(e.message) + // } + // } else { + // helpers.logger.error(JSON.stringify(e)) + // } + // // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job. + // helpers.logger.error(`we got an error during record Task so we throw and retry`) + // throw e + // } + + // helpers.logger.info('record Task has finished') } diff --git a/services/factory/package.json b/services/factory/package.json index 6068b73..f39dede 100644 --- a/services/factory/package.json +++ b/services/factory/package.json @@ -22,6 +22,7 @@ "dependencies": { "@aws-sdk/client-s3": "^3.637.0", "@aws-sdk/lib-storage": "^3.637.0", + "@futureporn/fetchers": "workspace:^", "@futureporn/storage": "workspace:^", "@futureporn/utils": "workspace:^", "@paralleldrive/cuid2": "^2.2.2", diff --git a/services/factory/pnpm-lock.yaml b/services/factory/pnpm-lock.yaml index 9420803..74878ba 100644 --- a/services/factory/pnpm-lock.yaml +++ b/services/factory/pnpm-lock.yaml @@ -14,6 +14,9 @@ importers: '@aws-sdk/lib-storage': specifier: ^3.637.0 version: 3.637.0(@aws-sdk/client-s3@3.637.0) + '@futureporn/fetchers': + specifier: workspace:^ + version: link:../../packages/fetchers '@futureporn/storage': specifier: workspace:^ version: link:../../packages/storage diff --git a/services/factory/src/tasks/combine_video_segments.ts b/services/factory/src/tasks/combine_video_segments.ts index d08d34e..ad53da9 100644 --- a/services/factory/src/tasks/combine_video_segments.ts +++ b/services/factory/src/tasks/combine_video_segments.ts @@ -13,9 +13,10 @@ import { createReadStream, createWriteStream, write } from 'node:fs'; import { writeFile, readFile } from 'node:fs/promises'; import { tmpdir } from 'node:os'; import { promisify } from 'node:util'; -import patchVodInDatabase from '../fetchers/patchVodInDatabase' +import patchVodInDatabase from '@futureporn/fetchers/patchVodInDatabase.ts' import { downloadFile } from '@futureporn/storage/s3.ts'; -import { S3FileRecord } from '@futureporn/types'; +import { S3FileRecord, VodRecord } from '@futureporn/types'; +const pipelinePromise = promisify(pipeline) interface s3ManifestEntry { key: string; @@ -108,20 +109,6 @@ const concatVideos = async function (videoFilePaths: string[]): Promise -const setupUploadPipeline = function ({ inputStream, uploadStream }: { inputStream: Readable, uploadStream: PassThrough }) { - pipeline( - inputStream, - uploadStream, - (err: any) => { - if (err) { - console.error(`upload pipeline errored.`) - console.error(err) - } else { - console.log('upload pipeline succeeded.') - } - } - ) -} const getS3ParallelUpload = async function ({ filePath, @@ -138,7 +125,7 @@ const getS3ParallelUpload = async function ({ const uploadStream = new PassThrough() const target: S3Target = { - Bucket: configs.s3Bucket, + Bucket: configs.s3UscBucket, Key: s3KeyName, Body: uploadStream } @@ -165,7 +152,7 @@ export const combine_video_segments: Task = async function (payload: unknown, he assertPayload(payload) const { s3_manifest, vod_id } = payload if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.'); - helpers.logger.info(`combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`) + helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`) /** * Here we take a manifest of S3 files and we download each of them. @@ -198,17 +185,13 @@ export const combine_video_segments: Task = async function (payload: unknown, he const inputStream = createReadStream(concatenatedVideoFile) const filePath = concatenatedVideoFile const { uploadStream, upload } = await getS3ParallelUpload({ client, s3KeyName, filePath }) - setupUploadPipeline({ inputStream, uploadStream }) + pipelinePromise(inputStream, uploadStream) await upload.done() if (!vod_id) throw new Error('vod_id was missing from payload'); // update the vod with the s3_file of the combined video - const s3File: S3FileRecord = { - s3_key: s3KeyName, - bucket: configs.s3UscBucket, - } const payload = { - s3_file: s3File, + s3_file: s3KeyName, vod_id } await patchVodInDatabase(vod_id, payload) diff --git a/services/factory/src/tasks/generate_thumbnail.ts b/services/factory/src/tasks/generate_thumbnail.ts index c06b52f..09b5717 100644 --- a/services/factory/src/tasks/generate_thumbnail.ts +++ b/services/factory/src/tasks/generate_thumbnail.ts @@ -1,8 +1,8 @@ import type { Task, Helpers } from "graphile-worker"; -import getVod from "../fetchers/getVod"; +import getVod from "@futureporn/fetchers/getVod"; import { getStoryboard } from '@futureporn/utils/image.ts' import { getCdnUrl, uploadFile, s3CdnMap, type S3FileArgs } from '@futureporn/storage/s3.ts' -import patchVodInDatabase from "../fetchers/patchVodInDatabase"; +import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase"; import { configs } from "../config"; import { basename } from "path"; @@ -21,14 +21,14 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper assertPayload(payload) const { vod_id } = payload if (!vod_id) throw new Error('generate_thumbnail Task was started without a vod_id. This is unsupported.'); - helpers.logger.info(`generate_thumbnail started with vod_id=${vod_id}`); + helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}`); const vod = await getVod(vod_id, helpers) const s3_file = vod?.s3_file if (!s3_file) throw new Error(`vod ${vod_id} was missing a s3_file.`); // we need to get a CDN url to the vod so we can download chunks of the file in order for Prevvy to create the storyboard image. - const cdnUrl = getCdnUrl(configs.s3Bucket, s3_file.s3_key) + const cdnUrl = getCdnUrl(configs.s3MainBucket, s3_file.s3_key) const tmpImagePath = await getStoryboard(cdnUrl) @@ -38,7 +38,7 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper filePath: tmpImagePath, s3AccessKeyId: configs.s3AccessKeyId, s3SecretAccessKey: configs.s3SecretAccessKey, - s3BucketName: configs.s3Bucket, + s3BucketName: configs.s3MainBucket, s3Endpoint: configs.s3Region, s3Region: configs.s3Region } @@ -47,7 +47,7 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper if (!upload.Key) throw new Error(`failed to upload ${tmpImagePath} to S3 (upload.Key was missing)`); // we need to create a S3 file in the db - const thumbnail = getCdnUrl(configs.s3Bucket, upload.Key) + const thumbnail = getCdnUrl(configs.s3MainBucket, upload.Key) await patchVodInDatabase(vod_id, { thumbnail }) diff --git a/services/factory/src/tasks/process_video.ts b/services/factory/src/tasks/process_video.ts index 0dda587..cc36b66 100644 --- a/services/factory/src/tasks/process_video.ts +++ b/services/factory/src/tasks/process_video.ts @@ -1,8 +1,8 @@ import type { Helpers, Task } from "graphile-worker" import { configs } from "../config" import type { Stream } from '@futureporn/types' -import createVod from "../fetchers/createVod" -import getVod from "../fetchers/getVod" +import createVod from "@futureporn/fetchers/createVod" +import getVod from "@futureporn/fetchers/getVod" interface Payload { vod_id: string; @@ -43,23 +43,24 @@ function assertPayload(payload: any): asserts payload is Payload { const process_video: Task = async function (payload: unknown, helpers: Helpers) { assertPayload(payload) const { vod_id } = payload - helpers.logger.info(`process_video task has begun for vod_id=${vod_id}`) + helpers.logger.info(`🏗️ process_video task has begun for vod_id=${vod_id}`) const vod = await getVod(vod_id, helpers) if (!vod) throw new Error(`failed to get vod from database.`); if (!vod.segments) throw new Error(`vod ${vod_id} fetched from database lacked any segments.`); + const maxAttempts = 6 const isCombinationNeeded = (vod.segments.length > 1) if (isCombinationNeeded) { const s3_manifest = vod.segments.map((segment) => ({ key: segment.s3_key, bytes: segment.bytes })) helpers.logger.info(`There are ${vod.segments.length} segments; Concatenation is needed.`) - helpers.addJob('combine_video_segments', { s3_manifest, vod_id }) + helpers.addJob('combine_video_segments', { s3_manifest, vod_id }, { maxAttempts }) } else { - helpers.addJob('remux_video', { vod_id }) + helpers.addJob('remux_video', { vod_id }, { maxAttempts }) } - helpers.addJob('generate_thumbnail', { vod_id }) + helpers.addJob('generate_thumbnail', { vod_id }, { maxAttempts }) // helpers.addJob('queue_moderator_review', { }) // helpers.addJob('create_mux_asset', { }) // helpers.addJob('create_torrent', { }) diff --git a/services/factory/src/tasks/remux_video.ts b/services/factory/src/tasks/remux_video.ts index 8bee75e..c1e8337 100644 --- a/services/factory/src/tasks/remux_video.ts +++ b/services/factory/src/tasks/remux_video.ts @@ -1,11 +1,12 @@ import type { Helpers, Task } from "graphile-worker" import { configs } from "../config" -import getVod from "../fetchers/getVod" +import getVod from "@futureporn/fetchers/getVod" import { downloadFile, uploadFile, type S3FileArgs } from "@futureporn/storage/s3.ts" import { remux } from '@futureporn/utils/video.ts' import { getTmpFile } from "@futureporn/utils/file.ts" import { basename } from "node:path" -import patchVodInDatabase from "../fetchers/patchVodInDatabase" +import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase" +import { S3Client, S3ClientConfig } from "@aws-sdk/client-s3" interface Payload { vod_id: string; @@ -28,7 +29,7 @@ function assertPayload(payload: any): asserts payload is Payload { const remux_video: Task = async function (payload: unknown, helpers: Helpers) { assertPayload(payload) const { vod_id } = payload - helpers.logger.info(`remux_video task has begun for vod_id=${vod_id}`) + helpers.logger.info(`🏗️ remux_video task has begun for vod_id=${vod_id}`) const vod = await getVod(vod_id, helpers) @@ -46,15 +47,16 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) { } const tmpFilePath = getTmpFile(segmentFileName) - const downloadArgs: S3FileArgs = { - filePath: tmpFilePath, - s3AccessKeyId: configs.s3AccessKeyId, - s3SecretAccessKey: configs.s3SecretAccessKey, - s3BucketName: configs.s3Bucket, - s3Endpoint: configs.s3Region, - s3Region: configs.s3Region + const s3ClientConfig: S3ClientConfig = { + credentials: { + accessKeyId: configs.s3AccessKeyId, + secretAccessKey: configs.s3SecretAccessKey, + }, + endpoint: configs.s3Endpoint, + region: configs.s3Region, } - await downloadFile(downloadArgs) + const client = new S3Client(s3ClientConfig) + await downloadFile(client, configs.s3UscBucket, segmentFileName) helpers.logger.info('Remuxing the video') const outputVideoPath = getTmpFile(`${basename(segmentFileName, '.ts')}.mp4`) @@ -65,7 +67,7 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) { filePath: outputVideoPath, s3AccessKeyId: configs.s3AccessKeyId, s3SecretAccessKey: configs.s3SecretAccessKey, - s3BucketName: configs.s3Bucket, + s3BucketName: configs.s3MainBucket, s3Endpoint: configs.s3Region, s3Region: configs.s3Region } @@ -76,4 +78,6 @@ const remux_video: Task = async function (payload: unknown, helpers: Helpers) { } + + export default remux_video; diff --git a/services/scout/src/scrapeVtuberData.ts b/services/scout/src/scrapeVtuberData.ts index abb17fe..706cf17 100644 --- a/services/scout/src/scrapeVtuberData.ts +++ b/services/scout/src/scrapeVtuberData.ts @@ -1,7 +1,7 @@ import type { VtuberDataScrape } from "@futureporn/types" import { fetchHtml, getBroadcasterDisplayName, getInitialRoomDossier } from './cb.ts' import { getAccountData, usernameRegex } from "./fansly.ts" -import { fpSlugify } from "@futureporn/utils" +import { fpSlugify } from "@futureporn/utils/name.ts" /** diff --git a/services/scout/src/temporal.activities.js b/services/scout/src/temporal.activities.js deleted file mode 100644 index 682e49d..0000000 --- a/services/scout/src/temporal.activities.js +++ /dev/null @@ -1,3 +0,0 @@ -export async function greet(name) { - return `Hello, ${name}!`; -} \ No newline at end of file diff --git a/services/scout/src/temporal.client.js b/services/scout/src/temporal.client.js deleted file mode 100644 index 0ddd4cb..0000000 --- a/services/scout/src/temporal.client.js +++ /dev/null @@ -1,42 +0,0 @@ -import { Client, Connection } from '@temporalio/client'; - -import { example } from './temporal.workflow.js'; -import { createId } from '@paralleldrive/cuid2'; - -async function run() { - // const cert = await fs.readFile('./path-to/your.pem'); - // const key = await fs.readFile('./path-to/your.key'); - - let connectionOptions = { - address: 'temporal-frontend.futureporn.svc.cluster.local', - }; - - const connection = await Connection.connect(connectionOptions); - - const client = new Client({ - connection, - namespace: 'futureporn', - }); - - console.log('>>> WE ARE RUNNING THE WORKFLOW!!!!') - - const handle = await client.workflow.start(example, { - taskQueue: 'hello-world', - // type inference works! args: [name: string] - args: ['Temporal'], - // in practice, use a meaningful business ID, like customerId or transactionId - workflowId: 'workflow-' + createId(), - }); - console.log(`Started workflow ${handle.workflowId}`); - - // optional: wait for client result - console.log(await handle.result()); // Hello, Temporal! - - await client.connection.close(); - -} - -run().catch((err) => { - console.error(err); - process.exit(1); -}); \ No newline at end of file diff --git a/services/scout/src/temporal.worker.js b/services/scout/src/temporal.worker.js deleted file mode 100644 index f608e1c..0000000 --- a/services/scout/src/temporal.worker.js +++ /dev/null @@ -1,38 +0,0 @@ -import { NativeConnection, Worker } from '@temporalio/worker' -import * as activities from './temporal.activities.js' -import path from 'node:path' - -async function run() { - // Step 1: Establish a connection with Temporal server. - // - // Worker code uses `@temporalio/worker.NativeConnection`. - // (But in your application code it's `@temporalio/client.Connection`.) - const connection = await NativeConnection.connect({ - address: 'temporal-frontend.futureporn.svc.cluster.local', - // TLS and gRPC metadata configuration goes here. - }); - // Step 2: Register Workflows and Activities with the Worker. - const worker = await Worker.create({ - connection, - namespace: 'futureporn', - taskQueue: 'hello-world', - // Workflows are registered using a path as they run in a separate JS context. - workflowsPath: path.join(import.meta.dirname, './temporal.workflow.js'), - activities, - }); - - // Step 3: Start accepting tasks on the `hello-world` queue - // - // The worker runs until it encounters an unexpected error or the process receives a shutdown signal registered on - // the SDK Runtime object. - // - // By default, worker logs are written via the Runtime logger to STDERR at INFO level. - // - // See https://typescript.temporal.io/api/classes/worker.Runtime#install to customize these defaults. - await worker.run(); -} - -run().catch((err) => { - console.error(err); - process.exit(1); -}) \ No newline at end of file diff --git a/services/scout/src/temporal.workflow.js b/services/scout/src/temporal.workflow.js deleted file mode 100644 index 74f2853..0000000 --- a/services/scout/src/temporal.workflow.js +++ /dev/null @@ -1,10 +0,0 @@ -import { proxyActivities } from '@temporalio/workflow'; - -const { greet } = proxyActivities({ - startToCloseTimeout: '1 minute', -}); - -/** A workflow that simply calls an activity */ -export async function example(name) { - return await greet(name); -} \ No newline at end of file diff --git a/services/scout/src/ytdlp.ts b/services/scout/src/ytdlp.ts index 912b5ca..edda0f3 100644 --- a/services/scout/src/ytdlp.ts +++ b/services/scout/src/ytdlp.ts @@ -31,7 +31,7 @@ export class RoomOfflineError extends Error { export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise { - console.log(`getPlaylistUrl roomUrl=${roomUrl}, proxy=${false}, retries=${retries}`) + console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`) let args = ['-g', roomUrl] if (proxy) { console.log(`proxy=${proxy}, HTTP_PROXY=${configs.httpProxy}`) @@ -49,7 +49,7 @@ export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = else throw new ExhaustedRetriesError() } else if (code === 0 && output.match(/https:\/\/.*\.m3u8/)) { // this must be an OK result with a playlist - return output + return output.trim() } else if (code === 1 && output.match(/Room is currently offline/)) { throw new RoomOfflineError() } else { diff --git a/services/strapi/package.json b/services/strapi/package.json index 59bf883..5a5f7b3 100644 --- a/services/strapi/package.json +++ b/services/strapi/package.json @@ -46,6 +46,6 @@ "node": "20.x.x", "npm": ">=6.0.0" }, - "packageManager": "pnpm@9.5.0", + "packageManager": "pnpm@9.6.0", "license": "MIT" }