From 4c71649c7680db8e82772dc192e4785b8a8ff778 Mon Sep 17 00:00:00 2001 From: CJ_Clippy Date: Wed, 2 Oct 2024 09:38:24 -0800 Subject: [PATCH] @futureporn/scout retries --- Tiltfile | 22 +- .../fp/templates/{factory.yaml => build.yaml} | 23 +- charts/fp/templates/capture.yaml | 41 +- charts/fp/values.yaml | 10 +- .../{factory.dockerfile => build.dockerfile} | 20 +- packages/fetchers/src/createSegment.ts | 2 +- packages/fetchers/src/getPlaylistUrl.ts | 22 +- .../fetchers/src/updateSegmentInDatabase.ts | 9 +- packages/storage/src/s3.ts | 2 +- packages/types/src/index.ts | 11 + packages/utils/src/error.ts | 76 +++ packages/utils/src/file.spec.ts | 9 +- packages/utils/src/file.ts | 19 +- packages/utils/src/image.spec.ts | 2 +- scripts/{factory-test.sh => build-test.sh} | 0 scripts/capture-integration.sh | 2 +- services/{factory => build}/README.md | 0 services/{factory => build}/package.json | 13 +- services/{factory => build}/pnpm-lock.yaml | 356 +++++++++++-- services/{factory => build}/src/config.ts | 4 + services/{factory => build}/src/index.ts | 0 .../src/tasks/combine_video_segments.ts | 173 ++++++- .../src/tasks/generate_thumbnail.ts | 51 +- .../src/tasks/process_video.ts | 0 .../src/tasks/remux_video.ts | 0 .../src/utils/importDirectory.ts | 0 services/{factory => build}/tsconfig.json | 0 services/capture/pnpm-lock.yaml | 28 -- services/capture/src/RecordNextGeneration.ts | 469 +++++++----------- services/capture/src/config.ts | 6 + services/capture/src/index.ts | 7 +- .../migrations/00072_create-builds-table.sql | 33 ++ .../00073_update-builds-table-timestamps.sql | 14 + ...74_switch-after-update-to-after-insert.sql | 7 + .../00075_add-checksum-to-segments.sql | 2 + ...00076_builds-use-vod_id-instead-of-vod.sql | 8 + ...77_builds-trigger-change-vod-to-vod_id.sql | 22 + .../00078_add-bytes_uploaded-to-segments.sql | 2 + services/scout/src/fastify.ts | 18 +- services/scout/src/ytdlp.ts | 30 +- 40 files changed, 1012 insertions(+), 501 deletions(-) rename charts/fp/templates/{factory.yaml => build.yaml} (73%) rename dockerfiles/{factory.dockerfile => build.dockerfile} (78%) create mode 100644 packages/utils/src/error.ts rename scripts/{factory-test.sh => build-test.sh} (100%) rename services/{factory => build}/README.md (100%) rename services/{factory => build}/package.json (88%) rename services/{factory => build}/pnpm-lock.yaml (90%) rename services/{factory => build}/src/config.ts (92%) rename services/{factory => build}/src/index.ts (100%) rename services/{factory => build}/src/tasks/combine_video_segments.ts (56%) rename services/{factory => build}/src/tasks/generate_thumbnail.ts (57%) rename services/{factory => build}/src/tasks/process_video.ts (100%) rename services/{factory => build}/src/tasks/remux_video.ts (100%) rename services/{factory => build}/src/utils/importDirectory.ts (100%) rename services/{factory => build}/tsconfig.json (100%) create mode 100644 services/migrations/migrations/00072_create-builds-table.sql create mode 100644 services/migrations/migrations/00073_update-builds-table-timestamps.sql create mode 100644 services/migrations/migrations/00074_switch-after-update-to-after-insert.sql create mode 100644 services/migrations/migrations/00075_add-checksum-to-segments.sql create mode 100644 services/migrations/migrations/00076_builds-use-vod_id-instead-of-vod.sql create mode 100644 services/migrations/migrations/00077_builds-trigger-change-vod-to-vod_id.sql create mode 100644 services/migrations/migrations/00078_add-bytes_uploaded-to-segments.sql diff --git a/Tiltfile b/Tiltfile index 1c6ab2b..8854403 100644 --- a/Tiltfile +++ b/Tiltfile @@ -279,10 +279,10 @@ cmd_button('pgadmin4:restore', icon_name='hub', text='import connection', ) -cmd_button('factory:test', - argv=['./scripts/factory-test.sh'], - resource='factory', - icon_name='factory', +cmd_button('build:test', + argv=['./scripts/build-test.sh'], + resource='build', + icon_name='build', text='test', ) @@ -314,12 +314,12 @@ docker_build( ) docker_build( - 'fp/factory', + 'fp/build', '.', - dockerfile='./dockerfiles/factory.dockerfile', + dockerfile='./dockerfiles/build.dockerfile', target='dev', live_update=[ - sync('./services/factory', '/app/services/factory') + sync('./services/build', '/app/services/build') ], pull=False, ) @@ -465,7 +465,7 @@ k8s_resource( labels=['backend'], ) k8s_resource( - workload='factory', + workload='build', resource_deps=['postgrest'], labels=['backend'], ) @@ -521,12 +521,6 @@ k8s_resource( labels=['backend'], resource_deps=['postgrest'], ) -# k8s_resource( -# workload='capture-api', -# port_forwards=['5003'], -# labels=['backend'], -# resource_deps=['postgrest', 'postgresql-primary'], -# ) k8s_resource( workload='capture-worker', labels=['backend'], diff --git a/charts/fp/templates/factory.yaml b/charts/fp/templates/build.yaml similarity index 73% rename from charts/fp/templates/factory.yaml rename to charts/fp/templates/build.yaml index 54506c5..e1bbf54 100644 --- a/charts/fp/templates/factory.yaml +++ b/charts/fp/templates/build.yaml @@ -3,23 +3,26 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: factory + name: build namespace: futureporn labels: - app.kubernetes.io/name: factory + app.kubernetes.io/name: build spec: - replicas: {{ .Values.factory.replicas }} + replicas: {{ .Values.build.replicas }} selector: matchLabels: - app: factory + app: build template: metadata: labels: - app: factory + app: build spec: containers: - - name: factory - image: "{{ .Values.factory.imageName }}" + - name: build + image: "{{ .Values.build.imageName }}" + volumeMounts: + - name: capture-worker-cache + mountPath: "{{ .Values.capture.cache.dir }}" env: - name: WORKER_CONNECTION_STRING valueFrom: @@ -35,6 +38,8 @@ spec: value: "{{ .Values.postgrest.url }}" - name: SCOUT_URL value: "{{ .Values.scout.url }}" + - name: CACHE_DIR + value: "{{ .Values.capture.cache.dir }}" - name: S3_ENDPOINT value: "{{ .Values.s3.endpoint }}" - name: S3_REGION @@ -59,3 +64,7 @@ spec: memory: 1Gi restartPolicy: Always + volumes: + - name: capture-worker-cache + persistentVolumeClaim: + claimName: capture-worker-cache-pvc \ No newline at end of file diff --git a/charts/fp/templates/capture.yaml b/charts/fp/templates/capture.yaml index cede202..d8daee3 100644 --- a/charts/fp/templates/capture.yaml +++ b/charts/fp/templates/capture.yaml @@ -1,5 +1,18 @@ +--- +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: capture-worker-cache-pvc + namespace: futureporn +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: {{ .Values.capture.cache.size }} + storageClassName: {{ .Values.storageClassName }} --- @@ -20,9 +33,26 @@ spec: labels: app: capture-worker spec: + # IDK if I need this initContainer. + # initContainers: + # - name: capture-worker-init + # image: busybox:latest + # command: + # - "/bin/mkdir" + # args: + # - "-p" + # - "/var/cache/taco-test" + # - "/tmp/test1" + # - "/test123" + # volumeMounts: + # - name: capture-worker-cache + # mountPath: "{{ .Values.capture.cache.dir }}" containers: - name: capture-worker image: "{{ .Values.capture.imageName }}" + volumeMounts: + - name: capture-worker-cache + mountPath: "{{ .Values.capture.cache.dir }}" env: # - name: NODE_DEBUG # value: "stream.onWriteComplete" @@ -30,6 +60,8 @@ spec: value: "{{ .Values.scout.url }}" - name: FUNCTION value: worker + - name: WORKER_CONCURRENCY + value: "1" - name: WORKER_CONNECTION_STRING valueFrom: secretKeyRef: @@ -47,8 +79,8 @@ spec: key: httpProxy - name: POSTGREST_URL value: "{{ .Values.postgrest.url }}" - - name: PORT - value: "{{ .Values.capture.api.port }}" + - name: CACHE_DIR + value: "{{ .Values.capture.cache.dir }}" - name: S3_ENDPOINT value: "{{ .Values.s3.endpoint }}" - name: S3_REGION @@ -70,4 +102,7 @@ spec: cpu: 250m memory: 1024Mi restartPolicy: Always - + volumes: + - name: capture-worker-cache + persistentVolumeClaim: + claimName: capture-worker-cache-pvc \ No newline at end of file diff --git a/charts/fp/values.yaml b/charts/fp/values.yaml index 50d626c..224a04d 100644 --- a/charts/fp/values.yaml +++ b/charts/fp/values.yaml @@ -24,18 +24,18 @@ capture: imageName: fp/capture worker: replicas: 1 - api: - port: 5003 - replicas: 1 + cache: + size: 20Gi + dir: /var/cache/capture-worker mailbox: imageName: fp/mailbox replicas: 1 cdnBucketUrl: https://fp-dev.b-cdn.net s3BucketName: fp-dev port: 5000 -factory: +build: replicas: 1 - imageName: fp/factory + imageName: fp/build strapi: replicas: 1 imageName: fp/strapi diff --git a/dockerfiles/factory.dockerfile b/dockerfiles/build.dockerfile similarity index 78% rename from dockerfiles/factory.dockerfile rename to dockerfiles/build.dockerfile index 1596c81..3476bb9 100644 --- a/dockerfiles/factory.dockerfile +++ b/dockerfiles/build.dockerfile @@ -1,7 +1,7 @@ -## d.factory.dockerfile +## d.build.dockerfile ## -## @futureporn/factory is the system component which processes video segments into a VOD. -## Factory does tasks such as thumbnail generation, video encoding, file transfers, strapi record creation, etc. +## @futureporn/build is the system component which processes video segments into a VOD. +## Build does tasks such as thumbnail generation, video encoding, file transfers, strapi record creation, etc. FROM node:20 AS base @@ -15,7 +15,7 @@ ENTRYPOINT ["pnpm"] FROM base AS install WORKDIR /app -RUN mkdir -p /app/services/factory && mkdir -p /prod/factory +RUN mkdir -p /app/services/build && mkdir -p /prod/build ## Copy manfiests, lockfiles, and configs into docker context COPY package.json pnpm-lock.yaml .npmrc . @@ -23,7 +23,7 @@ COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/ut COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/ COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/ COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/ -COPY ./services/factory/pnpm-lock.yaml ./services/factory/package.json ./services/factory/ +COPY ./services/build/pnpm-lock.yaml ./services/build/package.json ./services/build/ ## Install npm packages RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch @@ -37,7 +37,7 @@ RUN ls -la /app/packages/utils/node_modules/prevvy/ RUn cat ./packages/utils/package.json COPY ./packages/storage/ ./packages/storage/ COPY ./packages/types/ ./packages/types/ -COPY ./services/factory/ ./services/factory/ +COPY ./services/build/ ./services/build/ # we are grabbing the mp4 files from capture so we can run tests with them COPY ./services/capture/src/fixtures ./services/capture/src/fixtures @@ -49,15 +49,15 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm -r build ## Copy all production code into one place ## `pnpm deploy` copies all dependencies into an isolated node_modules directory inside the target dir ## @see https://pnpm.io/cli/deploy -RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/factory --prod /prod/factory +RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/build --prod /prod/build FROM install AS dev -WORKDIR /app/services/factory +WORKDIR /app/services/build RUN ls -lash CMD ["run", "dev"] -FROM base AS factory -COPY --from=build /prod/factory . +FROM base AS prod +COPY --from=build /prod/build . RUN ls -la . CMD ["start"] diff --git a/packages/fetchers/src/createSegment.ts b/packages/fetchers/src/createSegment.ts index bc7b9d2..f21116e 100644 --- a/packages/fetchers/src/createSegment.ts +++ b/packages/fetchers/src/createSegment.ts @@ -8,7 +8,7 @@ export default async function createSegment(s3_key: string, vod_id: string): Pro s3_key, vod_id } - const res = await fetch(`${configs.postgrestUrl}/segments`, { + const res = await fetch(`${configs.postgrestUrl}/segments?select=*,vod:vods(*,recordings(*))`, { method: 'POST', headers: { 'Content-Type': 'application/json', diff --git a/packages/fetchers/src/getPlaylistUrl.ts b/packages/fetchers/src/getPlaylistUrl.ts index 75bc64b..77be74d 100644 --- a/packages/fetchers/src/getPlaylistUrl.ts +++ b/packages/fetchers/src/getPlaylistUrl.ts @@ -1,21 +1,23 @@ import { configs } from './config.ts' +import { type GenericApiResponse } from '@futureporn/types' -export default async function getPlaylistUrl (url: string): Promise { +export default async function getPlaylistUrl (url: string): Promise { if (!url) throw new Error(`getPlaylistUrl requires a url, but it was undefined.`); const res = await fetch(`${configs.scoutUrl}/ytdlp/playlist-url?url=${url}`) if (!res.ok) { const body = await res.text() console.error(`failed to getPlaylistUrl res.status=${res.status}, res.statusText=${res.statusText}, body=${body}`) - return null - } else { - const data = await res.json() as any - console.log(`>>>>>> getPlaylistUrl got a data payload as follows`) - console.log(data) - if (!!data.error) { - return null; - } else { - return data.data.url + return { + error: 'PlaylistFailedError', + detail: `failed to getPlaylistUrl. res.status=${res.status}, res.statusText=${res.statusText}, body=${body}`, + data: null, + message: 'something went wrong wile fetching data from @futureporn/scout' } + } else { + const payload = await res.json() as any + console.log(`>>>>>> getPlaylistUrl data=${payload.data}, error=${payload.error} got a data payload as follows.`) + console.log(payload) + return payload } } \ No newline at end of file diff --git a/packages/fetchers/src/updateSegmentInDatabase.ts b/packages/fetchers/src/updateSegmentInDatabase.ts index 5d28509..5473ae2 100644 --- a/packages/fetchers/src/updateSegmentInDatabase.ts +++ b/packages/fetchers/src/updateSegmentInDatabase.ts @@ -11,14 +11,17 @@ import { configs } from './config.ts' */ export default async function updateSegmentInDatabase({ segment_id, - fileSize, + bytes, + bytes_uploaded, }: { segment_id: string, - fileSize: number, + bytes: number, + bytes_uploaded: number, }): Promise { const payload: any = { - bytes: fileSize + bytes, + bytes_uploaded } const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))` diff --git a/packages/storage/src/s3.ts b/packages/storage/src/s3.ts index 0ba5cf9..f3d67cc 100644 --- a/packages/storage/src/s3.ts +++ b/packages/storage/src/s3.ts @@ -1,6 +1,6 @@ import { S3Client, GetObjectCommand, type S3ClientConfig } from "@aws-sdk/client-s3"; import { Upload } from "@aws-sdk/lib-storage"; -import type { S3FileResponse, S3FileRecord, Stream } from '@futureporn/types'; +import type { S3FileResponse, S3FileRecord, StreamResponse } from '@futureporn/types'; import { createId } from '@paralleldrive/cuid2'; import { basename } from 'node:path'; import fs, { createWriteStream, createReadStream } from 'node:fs'; diff --git a/packages/types/src/index.ts b/packages/types/src/index.ts index 45620bb..7e8fc7c 100644 --- a/packages/types/src/index.ts +++ b/packages/types/src/index.ts @@ -7,6 +7,15 @@ export type ProcessingState = 'processing' export type WaitingState = 'pending_recording' export type Status = Partial +// @see https://www.baeldung.com/rest-api-error-handling-best-practices +export interface GenericApiResponse { + error: string|null; + data: any; + message: string; + detail?: string; + help?: string; +} + export interface S3FileRecord { s3_key: string; s3_id?: string; @@ -226,6 +235,8 @@ export interface SegmentResponse { s3_key: string; s3_id: string; bytes: number; + bytes_uploaded: number; + checksum: string; vod?: VodResponse; created_at: string; updated_at: string; diff --git a/packages/utils/src/error.ts b/packages/utils/src/error.ts new file mode 100644 index 0000000..0d307be --- /dev/null +++ b/packages/utils/src/error.ts @@ -0,0 +1,76 @@ + +export class ExhaustedRetriesError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, ExhaustedRetriesError.prototype) + this.name = this.constructor.name + this.message = `ExhaustedRetries: We retried the request the maximum amount of times.` + } + getErrorMessage() { + return this.message + } +} + +export class RoomOfflineError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, RoomOfflineError.prototype) + this.name = this.constructor.name + this.message = `RoomOffline. ${this.message}` + } + getErrorMessage() { + return this.message + } +} + + +export class AdminAbortedError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, AdminAbortedError.prototype) + this.name = this.constructor.name + this.message = `AdminAbortedError. ${this.message}` + } + getErrorMessage() { + return this.message + } +} + + +export class UploadFailedError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, UploadFailedError.prototype) + this.name = this.constructor.name + this.message = `UploadFailedError. ${this.message}` + } + getErrorMessage() { + return this.message + } +} + +export class PlaylistFailedError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, PlaylistFailedError.prototype) + this.name = this.constructor.name + this.message = `PlaylistFailedError. ${this.message}` + } + getErrorMessage() { + return this.message + } +} + + + +export class DownloadFailedError extends Error { + constructor(message?: string) { + super(message) + Object.setPrototypeOf(this, DownloadFailedError.prototype) + this.name = this.constructor.name + this.message = `DownloadFailedError. ${this.message}` + } + getErrorMessage() { + return this.message + } +} \ No newline at end of file diff --git a/packages/utils/src/file.spec.ts b/packages/utils/src/file.spec.ts index 7a5546a..2ba4f0c 100644 --- a/packages/utils/src/file.spec.ts +++ b/packages/utils/src/file.spec.ts @@ -1,10 +1,11 @@ -import { getTmpFile, download, getPackageVersion } from './file.ts' +import { getTmpFile, download, getPackageVersion, getFileChecksum } from './file.ts' import { expect } from 'chai' import { describe } from 'mocha' import { dirname, basename, join, isAbsolute } from 'node:path'; import { fileURLToPath } from 'url'; export const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(fileURLToPath(import.meta.url)); +const fixtureImage0 = join(__dirname, './fixtures/sample.webp') describe('file', function () { describe('integration', function () { @@ -30,5 +31,11 @@ describe('file', function () { expect(getPackageVersion('../package.json')).to.match(/\d+\.\d+\.\d+/) }) }) + describe('getFileChecksum', function () { + it('should get a MD5sum of a file', async function () { + const checksum = await getFileChecksum(fixtureImage0) + expect(checksum).to.equal('a1fe79a39fa6e63d0faca57527792823') + }) + }) }) }) \ No newline at end of file diff --git a/packages/utils/src/file.ts b/packages/utils/src/file.ts index ecf5da5..412d26a 100644 --- a/packages/utils/src/file.ts +++ b/packages/utils/src/file.ts @@ -5,6 +5,7 @@ import { Readable } from 'stream'; import { finished } from 'stream/promises'; import { dirname, basename, join, isAbsolute } from 'node:path'; import { fileURLToPath } from 'url'; +import { createHash } from 'node:crypto'; export const __filename = fileURLToPath(import.meta.url); const __dirname = dirname(fileURLToPath(import.meta.url)); const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0' @@ -77,4 +78,20 @@ export async function download({ return filePath; } -export const tmpFileRegex = /^\/tmp\/.*\.jpg$/; \ No newline at end of file +export const tmpFileRegex = /^\/tmp\/.*\.jpg$/; + + + +/** + * getFileChecksum + * greetz https://stackoverflow.com/a/44643479/1004931 + */ +export async function getFileChecksum(filePath: string, algorithm = 'md5') { + return new Promise((resolve, reject) => { + const hash = createHash(algorithm); + const stream = fs.createReadStream(filePath); + stream.on('error', err => reject(err)); + stream.on('data', chunk => hash.update(chunk)); + stream.on('end', () => resolve(hash.digest('hex'))); + }); +} diff --git a/packages/utils/src/image.spec.ts b/packages/utils/src/image.spec.ts index 42ceb94..6d22a36 100644 --- a/packages/utils/src/image.spec.ts +++ b/packages/utils/src/image.spec.ts @@ -30,7 +30,7 @@ describe('image', function () { describe('getStoryboard', function () { this.timeout(1000*60*15) it('should accept a URL and return a path to image on disk', async function () { - const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-25.mp4' + const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-27.mp4' const imagePath = await getStoryboard(url) expect(imagePath).to.match(/\.png/) }) diff --git a/scripts/factory-test.sh b/scripts/build-test.sh similarity index 100% rename from scripts/factory-test.sh rename to scripts/build-test.sh diff --git a/scripts/capture-integration.sh b/scripts/capture-integration.sh index 98e449e..f7fe5ae 100755 --- a/scripts/capture-integration.sh +++ b/scripts/capture-integration.sh @@ -22,4 +22,4 @@ curl -sL -H "Authorization: Bearer ${AUTOMATION_USER_JWT}" \ -H "Content-Type: application/json" \ -d '{"url": "'"${url}"'"}' \ http://localhost:9000/recordings -echo "finished creating recording" \ No newline at end of file +echo "recording created" \ No newline at end of file diff --git a/services/factory/README.md b/services/build/README.md similarity index 100% rename from services/factory/README.md rename to services/build/README.md diff --git a/services/factory/package.json b/services/build/package.json similarity index 88% rename from services/factory/package.json rename to services/build/package.json index f39dede..b1bde55 100644 --- a/services/factory/package.json +++ b/services/build/package.json @@ -1,7 +1,7 @@ { - "name": "@futureporn/factory", + "name": "@futureporn/build", "type": "module", - "version": "1.0.0", + "version": "2.0.0", "description": "", "main": "src/index.ts", "scripts": { @@ -15,7 +15,9 @@ "transcode", "transcoding", "process", - "processing" + "processing", + "build", + "factory" ], "author": "@cj_clippy", "license": "Unlicense", @@ -28,13 +30,12 @@ "@paralleldrive/cuid2": "^2.2.2", "@types/node": "^22.5.2", "dotenv": "^16.4.5", - "fluture": "^14.0.0", "graphile-worker": "^0.16.6", - "ramda": "^0.30.1" + "p-retry": "^6.2.0" }, "devDependencies": { "@futureporn/types": "workspace:^", - "@types/ramda": "^0.30.2", + "mocha": "^10.7.3", "nodemon": "^3.1.4", "ts-node": "^10.9.2", "tsx": "^4.19.0" diff --git a/services/factory/pnpm-lock.yaml b/services/build/pnpm-lock.yaml similarity index 90% rename from services/factory/pnpm-lock.yaml rename to services/build/pnpm-lock.yaml index 74878ba..79281ba 100644 --- a/services/factory/pnpm-lock.yaml +++ b/services/build/pnpm-lock.yaml @@ -32,22 +32,19 @@ importers: dotenv: specifier: ^16.4.5 version: 16.4.5 - fluture: - specifier: ^14.0.0 - version: 14.0.0 graphile-worker: specifier: ^0.16.6 version: 0.16.6(typescript@5.5.4) - ramda: - specifier: ^0.30.1 - version: 0.30.1 + p-retry: + specifier: ^6.2.0 + version: 6.2.0 devDependencies: '@futureporn/types': specifier: workspace:^ version: link:../../packages/types - '@types/ramda': - specifier: ^0.30.2 - version: 0.30.2 + mocha: + specifier: ^10.7.3 + version: 10.7.3 nodemon: specifier: ^3.1.4 version: 3.1.4 @@ -642,8 +639,8 @@ packages: '@types/pg@8.11.8': resolution: {integrity: sha512-IqpCf8/569txXN/HoP5i1LjXfKZWL76Yr2R77xgeIICUbAYHeoaEZFhYHo2uDftecLWrTJUq63JvQu8q3lnDyA==} - '@types/ramda@0.30.2': - resolution: {integrity: sha512-PyzHvjCalm2BRYjAU6nIB3TprYwMNOUY/7P/N8bSzp9W/yM2YrtGtAnnVtaCNSeOZ8DzKyFDvaqQs7LnWwwmBA==} + '@types/retry@0.12.2': + resolution: {integrity: sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==} '@types/semver@7.5.8': resolution: {integrity: sha512-I8EUhyrgfLrcTkzV3TSsGyl1tSuPrEDzr0yd5m90UgNxQkyDXULk3b6MlQqTCpZpNtWe1K0hzclnZkTcLBe2UQ==} @@ -657,6 +654,10 @@ packages: engines: {node: '>=0.4.0'} hasBin: true + ansi-colors@4.1.3: + resolution: {integrity: sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==} + engines: {node: '>=6'} + ansi-regex@5.0.1: resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==} engines: {node: '>=8'} @@ -695,10 +696,16 @@ packages: brace-expansion@1.1.11: resolution: {integrity: sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==} + brace-expansion@2.0.1: + resolution: {integrity: sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==} + braces@3.0.3: resolution: {integrity: sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==} engines: {node: '>=8'} + browser-stdout@1.3.1: + resolution: {integrity: sha512-qhAVI1+Av2X7qelOfAIYwXONood6XlZE/fXaBSmW/T5SzLAmCgzi+eiWE7fUvbHaeNBQH13UftjpXxsfLkMpgw==} + buffer@5.6.0: resolution: {integrity: sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==} @@ -706,6 +713,10 @@ packages: resolution: {integrity: sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ==} engines: {node: '>=6'} + camelcase@6.3.0: + resolution: {integrity: sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA==} + engines: {node: '>=10'} + chalk@2.4.2: resolution: {integrity: sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==} engines: {node: '>=4'} @@ -718,6 +729,9 @@ packages: resolution: {integrity: sha512-7VT13fmjotKpGipCW9JEQAusEPE+Ei8nl6/g4FBAmIm0GOOLMua9NDDo/DWp0ZAxCr3cPq5ZpBqmPAQgDda2Pw==} engines: {node: '>= 8.10.0'} + cliui@7.0.4: + resolution: {integrity: sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==} + cliui@8.0.1: resolution: {integrity: sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==} engines: {node: '>=12'} @@ -759,10 +773,18 @@ packages: supports-color: optional: true + decamelize@4.0.0: + resolution: {integrity: sha512-9iE1PgSik9HeIIw2JO94IidnE3eBoQrFJ3w7sFuzSX4DpmZ3v5sZpUiV5Swcf6mQEF+Y0ru8Neo+p+nyh2J+hQ==} + engines: {node: '>=10'} + diff@4.0.2: resolution: {integrity: sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==} engines: {node: '>=0.3.1'} + diff@5.2.0: + resolution: {integrity: sha512-uIFDxqpRZGZ6ThOk84hEfqWoHx2devRFvpTZcTHur85vImfaxUbTW9Ryh4CpCuDnToOP1CEtXKIgytHBPVff5A==} + engines: {node: '>=0.3.1'} + dotenv@16.4.5: resolution: {integrity: sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==} engines: {node: '>=12'} @@ -786,6 +808,10 @@ packages: resolution: {integrity: sha512-vbRorB5FUQWvla16U8R/qgaFIya2qGzwDrNmCZuYKrbdSUMG6I1ZCGQRefkRVhuOkIGVne7BQ35DSfo1qvJqFg==} engines: {node: '>=0.8.0'} + escape-string-regexp@4.0.0: + resolution: {integrity: sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA==} + engines: {node: '>=10'} + events@3.3.0: resolution: {integrity: sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==} engines: {node: '>=0.8.x'} @@ -798,9 +824,16 @@ packages: resolution: {integrity: sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==} engines: {node: '>=8'} - fluture@14.0.0: - resolution: {integrity: sha512-pENtLF948a8DfduVKugT8edTAbFi4rBS94xjHwzLanQqIu5PYtLGl+xqs6H8TaIRL7z/B0cDpswdINzH/HRUGA==} - engines: {node: '>=4.0.0'} + find-up@5.0.0: + resolution: {integrity: sha512-78/PXT1wlLLDgTzDs7sjq9hzz0vXD+zn+7wypEe4fXQxCmdmqfGsEPQxmiCSQI3ajFV91bVSsvNtrJRiW6nGng==} + engines: {node: '>=10'} + + flat@5.0.2: + resolution: {integrity: sha512-b6suED+5/3rTpUBdG1gupIl8MPFCAMA0QXwmljLhvCUKcUvdE4gWky9zpuGCcXHOsz4J9wPGNWq6OKpmIzz3hQ==} + hasBin: true + + fs.realpath@1.0.0: + resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==} fsevents@2.3.3: resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==} @@ -818,6 +851,11 @@ packages: resolution: {integrity: sha512-AOIgSQCepiJYwP3ARnGx+5VnTu2HBYdzbGP45eLw1vr3zB3vZLeyed1sC9hnbcOc9/SrMyM5RPQrkGz4aS9Zow==} engines: {node: '>= 6'} + glob@8.1.0: + resolution: {integrity: sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==} + engines: {node: '>=12'} + deprecated: Glob versions prior to v9 are no longer supported + graphile-config@0.0.1-beta.9: resolution: {integrity: sha512-7vNxXZ24OAgXxDKXYi9JtgWPMuNbBL3057Yf32Ux+/rVP4+EePgySCc+NNnn0tORi8qwqVreN8bdWqGIcSwNXg==} engines: {node: '>=16'} @@ -835,6 +873,10 @@ packages: resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==} engines: {node: '>=8'} + he@1.2.0: + resolution: {integrity: sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==} + hasBin: true + ieee754@1.2.1: resolution: {integrity: sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==} @@ -845,6 +887,10 @@ packages: resolution: {integrity: sha512-veYYhQa+D1QBKznvhUHxb8faxlrwUnxseDAbAp457E0wLNio2bOSKnjYDhMj+YiAq61xrMGhQk9iXVk5FzgQMw==} engines: {node: '>=6'} + inflight@1.0.6: + resolution: {integrity: sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==} + deprecated: This module is not supported, and leaks memory. Do not use it. Check out lru-cache if you want a good and tested way to coalesce async requests by a key value, which is much more comprehensive and powerful. + inherits@2.0.4: resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==} @@ -871,10 +917,22 @@ packages: resolution: {integrity: sha512-xelSayHH36ZgE7ZWhli7pW34hNbNl8Ojv5KVmkJD4hBdD3th8Tfk9vYasLM+mXWOZhFkgZfxhLSnrwRr4elSSg==} engines: {node: '>=0.10.0'} + is-network-error@1.1.0: + resolution: {integrity: sha512-tUdRRAnhT+OtCZR/LxZelH/C7QtjtFrTu5tXCA8pl55eTUElUHT+GPYV8MBMBvea/j+NxQqVt3LbWMRir7Gx9g==} + engines: {node: '>=16'} + is-number@7.0.0: resolution: {integrity: sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==} engines: {node: '>=0.12.0'} + is-plain-obj@2.1.0: + resolution: {integrity: sha512-YWnfyRwxL/+SsrWYfOpUtz5b3YD+nyfkHvjbcanzk8zgyO4ASD67uVMRt8k5bM4lLMDnXfriRhOpemw+NfT1eA==} + engines: {node: '>=8'} + + is-unicode-supported@0.1.0: + resolution: {integrity: sha512-knxG2q4UC3u8stRGyAVJCOdxFmv5DZiRcdlIaAQXAbSfJya+OhopNotLQrstBhququ4ZpuKbDc/8S6mgXgPFPw==} + engines: {node: '>=10'} + js-tokens@4.0.0: resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==} @@ -893,15 +951,35 @@ packages: lines-and-columns@1.2.4: resolution: {integrity: sha512-7ylylesZQ/PV29jhEDl3Ufjo6ZX7gCqJr5F7PKrqc93v7fzSymt1BpwEU8nAUXs8qzzvqhbjhK5QZg6Mt/HkBg==} + locate-path@6.0.0: + resolution: {integrity: sha512-iPZK6eYjbxRu3uB4/WZ3EsEIMJFMqAoopl3R+zuq0UjcAm/MO6KCweDgPfP3elTztoKP3KtnVHxTn2NHBSDVUw==} + engines: {node: '>=10'} + + log-symbols@4.1.0: + resolution: {integrity: sha512-8XPvpAA8uyhfteu8pIvQxpJZ7SYYdpUivZpGy6sFsBuKRY/7rQGavedeB8aK+Zkyq6upMFVL/9AW6vOYzfRyLg==} + engines: {node: '>=10'} + make-error@1.3.6: resolution: {integrity: sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==} minimatch@3.1.2: resolution: {integrity: sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==} + minimatch@5.1.6: + resolution: {integrity: sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==} + engines: {node: '>=10'} + + mocha@10.7.3: + resolution: {integrity: sha512-uQWxAu44wwiACGqjbPYmjo7Lg8sFrS3dQe7PP2FQI+woptP4vZXSMcfMyFL/e1yFEeEpV4RtyTpZROOKmxis+A==} + engines: {node: '>= 14.0.0'} + hasBin: true + ms@2.1.2: resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==} + ms@2.1.3: + resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==} + nodemon@3.1.4: resolution: {integrity: sha512-wjPBbFhtpJwmIeY2yP7QF+UKzPfltVGtfce1g/bB15/8vCGZj8uxD62b/b9M9/WVgme0NZudpownKN+c0plXlQ==} engines: {node: '>=10'} @@ -914,6 +992,21 @@ packages: obuf@1.1.2: resolution: {integrity: sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==} + once@1.4.0: + resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==} + + p-limit@3.1.0: + resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==} + engines: {node: '>=10'} + + p-locate@5.0.0: + resolution: {integrity: sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw==} + engines: {node: '>=10'} + + p-retry@6.2.0: + resolution: {integrity: sha512-JA6nkq6hKyWLLasXQXUrO4z8BUZGUt/LjlJxx8Gb2+2ntodU/SS63YZ8b0LUTbQ8ZB9iwOfhEPhg4ykKnn2KsA==} + engines: {node: '>=16.17'} + parent-module@1.0.1: resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==} engines: {node: '>=6'} @@ -922,6 +1015,10 @@ packages: resolution: {integrity: sha512-ayCKvm/phCGxOkYRSCM82iDwct8/EonSEgCSxWxD7ve6jHggsFl4fZVQBPRNgQoKiuV/odhFrGzQXZwbifC8Rg==} engines: {node: '>=8'} + path-exists@4.0.0: + resolution: {integrity: sha512-ak9Qy5Q7jYb2Wwcey5Fpvg2KoAc/ZIhLSLOSBmRmygPsGwkVVt0fZa0qrtMz+m6tJTAHfZQ8FnmB4MG4LWy7/w==} + engines: {node: '>=8'} + path-type@4.0.0: resolution: {integrity: sha512-gDKb8aZMDeD/tZWs9P6+q0J9Mwkdl6xMV8TjnGP3qJVJ06bdMgkbBlLU8IdfOsIsFz2BW1rNVT3XuNEl8zPAvw==} engines: {node: '>=8'} @@ -1013,8 +1110,8 @@ packages: pstree.remy@1.1.8: resolution: {integrity: sha512-77DZwxQmxKnu3aR542U+X8FypNzbfJ+C5XQDk3uWjWxn6151aIMGthWYRXTqT1E5oJvg+ljaa2OJi+VfvCOQ8w==} - ramda@0.30.1: - resolution: {integrity: sha512-tEF5I22zJnuclswcZMc8bDIrwRHRzf+NqVEmqg50ShAZMP7MWeR/RGDthfM/p+BlqvF2fXAzpn8i+SJcYD3alw==} + randombytes@2.1.0: + resolution: {integrity: sha512-vYl3iOX+4CKUWuxGi9Ukhie6fsqXqS9FE2Zaic4tNFD2N2QQaXOMFbuKK4QmDHC0JO6B1Zp41J0LpT0oR68amQ==} readable-stream@3.6.2: resolution: {integrity: sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==} @@ -1035,20 +1132,21 @@ packages: resolve-pkg-maps@1.0.0: resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==} + retry@0.13.1: + resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==} + engines: {node: '>= 4'} + safe-buffer@5.2.1: resolution: {integrity: sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==} - sanctuary-show@2.0.0: - resolution: {integrity: sha512-REj4ZiioUXnDLj6EpJ9HcYDIEGaEexmB9Fg5o6InZR9f0x5PfnnC21QeU9SZ9E7G8zXSZPNjy8VRUK4safbesw==} - - sanctuary-type-identifiers@3.0.0: - resolution: {integrity: sha512-YFXYcG0Ura1dSPd/1xLYtE2XAWUEsBHhMTZvYBOvwT8MeFQwdUOCMm2DC+r94z6H93FVq0qxDac8/D7QpJj6Mg==} - semver@7.6.3: resolution: {integrity: sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A==} engines: {node: '>=10'} hasBin: true + serialize-javascript@6.0.2: + resolution: {integrity: sha512-Saa1xPByTTq2gdeFZYLLo+RFE35NHZkAbqZeWNd3BpzppeVisAqpDjcp8dyf6uIvEqJRd46jemmyA4iFIeVk8g==} + simple-update-notifier@2.0.0: resolution: {integrity: sha512-a2B9Y0KlNXl9u/vsW6sTIu9vGEpfKu2wRV6l1H3XEas/0gUIzGzBoP/IouTcUQbm9JWZLH3COxyn03TYlFax6w==} engines: {node: '>=10'} @@ -1071,6 +1169,10 @@ packages: resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==} engines: {node: '>=8'} + strip-json-comments@3.1.1: + resolution: {integrity: sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==} + engines: {node: '>=8'} + strnum@1.0.5: resolution: {integrity: sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==} @@ -1082,6 +1184,10 @@ packages: resolution: {integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==} engines: {node: '>=8'} + supports-color@8.1.1: + resolution: {integrity: sha512-MpUEN2OodtUzxvKQl72cUF7RQ5EiHsGvSsVG0ia9c5RbWGL2CI4C7EpPS8UTBIplnlzZiNuV56w+FuNxy3ty2Q==} + engines: {node: '>=10'} + to-regex-range@5.0.1: resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==} engines: {node: '>=8.0'} @@ -1104,9 +1210,6 @@ packages: '@swc/wasm': optional: true - ts-toolbelt@9.6.0: - resolution: {integrity: sha512-nsZd8ZeNUzukXPlJmTBwUAuABDe/9qtVDelJeT/qW0ow3ZS3BsQJtNkan1802aM9Uf68/Y8ljw86Hu0h5IUW3w==} - tslib@2.7.0: resolution: {integrity: sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==} @@ -1115,9 +1218,6 @@ packages: engines: {node: '>=18.0.0'} hasBin: true - types-ramda@0.30.1: - resolution: {integrity: sha512-1HTsf5/QVRmLzcGfldPFvkVsAdi1db1BBKzi7iW3KBUlOICg/nKnFS+jGqDJS3YD8VsWbAh7JiHeBvbsw8RPxA==} - typescript@5.5.4: resolution: {integrity: sha512-Mtq29sKDAEYP7aljRgtPOpTvOfbwRWlS6dPRzwjdE+C0R4brX/GUyhHSecbHMFLNBLcJIPt9nl9yG5TZ1weH+Q==} engines: {node: '>=14.17'} @@ -1139,10 +1239,16 @@ packages: v8-compile-cache-lib@3.0.1: resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==} + workerpool@6.5.1: + resolution: {integrity: sha512-Fs4dNYcsdpYSAfVxhnl1L5zTksjvOJxtC5hzMNl+1t9B8hTJTdKDyZ5ju7ztgPy+ft9tBFXoOlDNiOT9WUXZlA==} + wrap-ansi@7.0.0: resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==} engines: {node: '>=10'} + wrappy@1.0.2: + resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + xtend@4.0.2: resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==} engines: {node: '>=0.4'} @@ -1151,10 +1257,22 @@ packages: resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} engines: {node: '>=10'} + yargs-parser@20.2.9: + resolution: {integrity: sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==} + engines: {node: '>=10'} + yargs-parser@21.1.1: resolution: {integrity: sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw==} engines: {node: '>=12'} + yargs-unparser@2.0.0: + resolution: {integrity: sha512-7pRTIA9Qc1caZ0bZ6RYRGbHJthJWuakf+WmHK0rVeLkNrrGhfoabBNdue6kdINI6r4if7ocq9aD/n7xwKOdzOA==} + engines: {node: '>=10'} + + yargs@16.2.0: + resolution: {integrity: sha512-D1mvvtDG0L5ft/jGWkLpG1+m0eQxOfaBvTNELraWj22wSVUMWxZUvYgJYcKh6jGGIkJFhH4IZPQhR4TKpc8mBw==} + engines: {node: '>=10'} + yargs@17.7.2: resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==} engines: {node: '>=12'} @@ -1163,6 +1281,10 @@ packages: resolution: {integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==} engines: {node: '>=6'} + yocto-queue@0.1.0: + resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} + engines: {node: '>=10'} + snapshots: '@aws-crypto/crc32@5.2.0': @@ -2138,9 +2260,7 @@ snapshots: pg-protocol: 1.6.1 pg-types: 4.0.2 - '@types/ramda@0.30.2': - dependencies: - types-ramda: 0.30.1 + '@types/retry@0.12.2': {} '@types/semver@7.5.8': {} @@ -2150,6 +2270,8 @@ snapshots: acorn@8.12.1: {} + ansi-colors@4.1.3: {} + ansi-regex@5.0.1: {} ansi-styles@3.2.1: @@ -2182,10 +2304,16 @@ snapshots: balanced-match: 1.0.2 concat-map: 0.0.1 + brace-expansion@2.0.1: + dependencies: + balanced-match: 1.0.2 + braces@3.0.3: dependencies: fill-range: 7.1.1 + browser-stdout@1.3.1: {} + buffer@5.6.0: dependencies: base64-js: 1.5.1 @@ -2193,6 +2321,8 @@ snapshots: callsites@3.1.0: {} + camelcase@6.3.0: {} + chalk@2.4.2: dependencies: ansi-styles: 3.2.1 @@ -2216,6 +2346,12 @@ snapshots: optionalDependencies: fsevents: 2.3.3 + cliui@7.0.4: + dependencies: + string-width: 4.2.3 + strip-ansi: 6.0.1 + wrap-ansi: 7.0.0 + cliui@8.0.1: dependencies: string-width: 4.2.3 @@ -2247,14 +2383,28 @@ snapshots: create-require@1.1.1: {} + debug@4.3.6: + dependencies: + ms: 2.1.2 + debug@4.3.6(supports-color@5.5.0): dependencies: ms: 2.1.2 optionalDependencies: supports-color: 5.5.0 + debug@4.3.6(supports-color@8.1.1): + dependencies: + ms: 2.1.2 + optionalDependencies: + supports-color: 8.1.1 + + decamelize@4.0.0: {} + diff@4.0.2: {} + diff@5.2.0: {} + dotenv@16.4.5: {} emoji-regex@8.0.0: {} @@ -2294,6 +2444,8 @@ snapshots: escape-string-regexp@1.0.5: {} + escape-string-regexp@4.0.0: {} + events@3.3.0: {} fast-xml-parser@4.4.1: @@ -2304,10 +2456,14 @@ snapshots: dependencies: to-regex-range: 5.0.1 - fluture@14.0.0: + find-up@5.0.0: dependencies: - sanctuary-show: 2.0.0 - sanctuary-type-identifiers: 3.0.0 + locate-path: 6.0.0 + path-exists: 4.0.0 + + flat@5.0.2: {} + + fs.realpath@1.0.0: {} fsevents@2.3.3: optional: true @@ -2322,13 +2478,21 @@ snapshots: dependencies: is-glob: 4.0.3 + glob@8.1.0: + dependencies: + fs.realpath: 1.0.0 + inflight: 1.0.6 + inherits: 2.0.4 + minimatch: 5.1.6 + once: 1.4.0 + graphile-config@0.0.1-beta.9: dependencies: '@types/interpret': 1.1.3 '@types/node': 20.16.3 '@types/semver': 7.5.8 chalk: 4.1.2 - debug: 4.3.6(supports-color@5.5.0) + debug: 4.3.6 interpret: 3.1.1 semver: 7.6.3 tslib: 2.7.0 @@ -2356,6 +2520,8 @@ snapshots: has-flag@4.0.0: {} + he@1.2.0: {} + ieee754@1.2.1: {} ignore-by-default@1.0.1: {} @@ -2365,6 +2531,11 @@ snapshots: parent-module: 1.0.1 resolve-from: 4.0.0 + inflight@1.0.6: + dependencies: + once: 1.4.0 + wrappy: 1.0.2 + inherits@2.0.4: {} interpret@3.1.1: {} @@ -2383,8 +2554,14 @@ snapshots: dependencies: is-extglob: 2.1.1 + is-network-error@1.1.0: {} + is-number@7.0.0: {} + is-plain-obj@2.1.0: {} + + is-unicode-supported@0.1.0: {} + js-tokens@4.0.0: {} js-yaml@4.1.0: @@ -2397,14 +2574,52 @@ snapshots: lines-and-columns@1.2.4: {} + locate-path@6.0.0: + dependencies: + p-locate: 5.0.0 + + log-symbols@4.1.0: + dependencies: + chalk: 4.1.2 + is-unicode-supported: 0.1.0 + make-error@1.3.6: {} minimatch@3.1.2: dependencies: brace-expansion: 1.1.11 + minimatch@5.1.6: + dependencies: + brace-expansion: 2.0.1 + + mocha@10.7.3: + dependencies: + ansi-colors: 4.1.3 + browser-stdout: 1.3.1 + chokidar: 3.6.0 + debug: 4.3.6(supports-color@8.1.1) + diff: 5.2.0 + escape-string-regexp: 4.0.0 + find-up: 5.0.0 + glob: 8.1.0 + he: 1.2.0 + js-yaml: 4.1.0 + log-symbols: 4.1.0 + minimatch: 5.1.6 + ms: 2.1.3 + serialize-javascript: 6.0.2 + strip-json-comments: 3.1.1 + supports-color: 8.1.1 + workerpool: 6.5.1 + yargs: 16.2.0 + yargs-parser: 20.2.9 + yargs-unparser: 2.0.0 + ms@2.1.2: {} + ms@2.1.3: {} + nodemon@3.1.4: dependencies: chokidar: 3.6.0 @@ -2422,6 +2637,24 @@ snapshots: obuf@1.1.2: {} + once@1.4.0: + dependencies: + wrappy: 1.0.2 + + p-limit@3.1.0: + dependencies: + yocto-queue: 0.1.0 + + p-locate@5.0.0: + dependencies: + p-limit: 3.1.0 + + p-retry@6.2.0: + dependencies: + '@types/retry': 0.12.2 + is-network-error: 1.1.0 + retry: 0.13.1 + parent-module@1.0.1: dependencies: callsites: 3.1.0 @@ -2433,6 +2666,8 @@ snapshots: json-parse-even-better-errors: 2.3.1 lines-and-columns: 1.2.4 + path-exists@4.0.0: {} + path-type@4.0.0: {} pg-cloudflare@1.1.1: @@ -2510,7 +2745,9 @@ snapshots: pstree.remy@1.1.8: {} - ramda@0.30.1: {} + randombytes@2.1.0: + dependencies: + safe-buffer: 5.2.1 readable-stream@3.6.2: dependencies: @@ -2528,14 +2765,16 @@ snapshots: resolve-pkg-maps@1.0.0: {} + retry@0.13.1: {} + safe-buffer@5.2.1: {} - sanctuary-show@2.0.0: {} - - sanctuary-type-identifiers@3.0.0: {} - semver@7.6.3: {} + serialize-javascript@6.0.2: + dependencies: + randombytes: 2.1.0 + simple-update-notifier@2.0.0: dependencies: semver: 7.6.3 @@ -2561,6 +2800,8 @@ snapshots: dependencies: ansi-regex: 5.0.1 + strip-json-comments@3.1.1: {} + strnum@1.0.5: {} supports-color@5.5.0: @@ -2571,6 +2812,10 @@ snapshots: dependencies: has-flag: 4.0.0 + supports-color@8.1.1: + dependencies: + has-flag: 4.0.0 + to-regex-range@5.0.1: dependencies: is-number: 7.0.0 @@ -2595,8 +2840,6 @@ snapshots: v8-compile-cache-lib: 3.0.1 yn: 3.1.1 - ts-toolbelt@9.6.0: {} - tslib@2.7.0: {} tsx@4.19.0: @@ -2606,10 +2849,6 @@ snapshots: optionalDependencies: fsevents: 2.3.3 - types-ramda@0.30.1: - dependencies: - ts-toolbelt: 9.6.0 - typescript@5.5.4: {} undefsafe@2.0.5: {} @@ -2622,18 +2861,41 @@ snapshots: v8-compile-cache-lib@3.0.1: {} + workerpool@6.5.1: {} + wrap-ansi@7.0.0: dependencies: ansi-styles: 4.3.0 string-width: 4.2.3 strip-ansi: 6.0.1 + wrappy@1.0.2: {} + xtend@4.0.2: {} y18n@5.0.8: {} + yargs-parser@20.2.9: {} + yargs-parser@21.1.1: {} + yargs-unparser@2.0.0: + dependencies: + camelcase: 6.3.0 + decamelize: 4.0.0 + flat: 5.0.2 + is-plain-obj: 2.1.0 + + yargs@16.2.0: + dependencies: + cliui: 7.0.4 + escalade: 3.2.0 + get-caller-file: 2.0.5 + require-directory: 2.1.1 + string-width: 4.2.3 + y18n: 5.0.8 + yargs-parser: 20.2.9 + yargs@17.7.2: dependencies: cliui: 8.0.1 @@ -2645,3 +2907,5 @@ snapshots: yargs-parser: 21.1.1 yn@3.1.1: {} + + yocto-queue@0.1.0: {} diff --git a/services/factory/src/config.ts b/services/build/src/config.ts similarity index 92% rename from services/factory/src/config.ts rename to services/build/src/config.ts index 3412c8f..747ddc8 100644 --- a/services/factory/src/config.ts +++ b/services/build/src/config.ts @@ -14,6 +14,7 @@ if (!process.env.S3_REGION) throw new Error('Missing S3_REGION env var'); if (!process.env.S3_ENDPOINT) throw new Error('Missing S3_REGION env var'); if (!process.env.S3_MAIN_BUCKET) throw new Error('Missing S3_BUCKET env var'); if (!process.env.S3_USC_BUCKET) throw new Error('Missing S3_USC_BUCKET env var'); +if (!process.env.CACHE_DIR) throw new Error('Missing CACHE_DIR env var'); const postgrestUrl = process.env.POSTGREST_URL! const automationUserJwt = process.env.AUTOMATION_USER_JWT! const connectionString = process.env.WORKER_CONNECTION_STRING! @@ -23,6 +24,7 @@ const s3Endpoint = process.env.S3_ENDPOINT! const s3SecretAccessKey = process.env.S3_SECRET_ACCESS_KEY! const s3MainBucket = process.env.S3_MAIN_BUCKET! const s3UscBucket = process.env.S3_USC_BUCKET! +const cacheDir = process.env.CACHE_DIR! export interface Config { postgrestUrl: string; @@ -34,6 +36,7 @@ export interface Config { s3Endpoint: string; s3UscBucket: string; s3MainBucket: string; + cacheDir: string; } @@ -47,4 +50,5 @@ export const configs: Config = { s3Region, s3MainBucket, s3UscBucket, + cacheDir, } diff --git a/services/factory/src/index.ts b/services/build/src/index.ts similarity index 100% rename from services/factory/src/index.ts rename to services/build/src/index.ts diff --git a/services/factory/src/tasks/combine_video_segments.ts b/services/build/src/tasks/combine_video_segments.ts similarity index 56% rename from services/factory/src/tasks/combine_video_segments.ts rename to services/build/src/tasks/combine_video_segments.ts index ad53da9..8781c2d 100644 --- a/services/factory/src/tasks/combine_video_segments.ts +++ b/services/build/src/tasks/combine_video_segments.ts @@ -4,19 +4,20 @@ import { basename, join } from 'node:path'; import { S3Client } from "@aws-sdk/client-s3" import { Upload } from "@aws-sdk/lib-storage" import { createId } from '@paralleldrive/cuid2'; -// import { downloadS3File, uploadToS3, createStrapiB2File, createStrapiVod, createStrapiStream } from '@futureporn/storage' -// import { concatenateVideoSegments } from '@futureporn/video' import { execFile } from 'node:child_process'; import { configs } from '../config'; -import { pipeline, PassThrough, Readable } from 'node:stream'; -import { createReadStream, createWriteStream, write } from 'node:fs'; -import { writeFile, readFile } from 'node:fs/promises'; +import { PassThrough, Readable } from 'node:stream'; +import { createReadStream } from 'node:fs'; +import { stat, writeFile } from 'node:fs/promises'; +import { pipeline } from 'node:stream/promises'; import { tmpdir } from 'node:os'; -import { promisify } from 'node:util'; import patchVodInDatabase from '@futureporn/fetchers/patchVodInDatabase.ts' import { downloadFile } from '@futureporn/storage/s3.ts'; -import { S3FileRecord, VodRecord } from '@futureporn/types'; -const pipelinePromise = promisify(pipeline) +import { S3FileRecord, SegmentResponse } from '@futureporn/types'; +import getVod from '@futureporn/fetchers/getVod.ts'; +import { type S3ClientConfig } from "@aws-sdk/client-s3"; +import pRetry, {AbortError} from 'p-retry' +import { getFileChecksum, getTmpFile } from '@futureporn/utils/file.ts'; interface s3ManifestEntry { key: string; @@ -24,8 +25,8 @@ interface s3ManifestEntry { } interface Payload { - s3_manifest: s3ManifestEntry[]; vod_id?: string; + s3_manifest?: S3FileRecord[]; } interface S3Target { @@ -45,10 +46,16 @@ interface S3UploadParameters { function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); - if (typeof payload.s3_manifest !== "object") throw new Error("invalid s3_manifest"); } - +async function withRetry(fn: () => Promise, retries = 3): Promise { + return pRetry(fn, { + onFailedAttempt: (e: Error) => { + console.error(`Error during attempt:`, e); + }, + retries + }); +} /** * @@ -143,17 +150,78 @@ const getS3ParallelUpload = async function ({ } +/** + * Checks if a file exists at the specified path. + */ +async function fileExists(path: string): Promise { + try { + await stat(path); + return true; + } catch { + return false; + } +} +/** + * Validates checksum if an expected checksum is provided. + */ +async function validateChecksumIfNeeded(filePath: string, segment: SegmentResponse) { + const expectedChecksum = segment?.checksum; + if (expectedChecksum) { + const actualChecksum = await getFileChecksum(filePath, 'md5'); + if (expectedChecksum !== actualChecksum) { + throw new Error(`Downloaded segment ${segment.id} but the expected checksum ${expectedChecksum} did not match actual ${actualChecksum}.`); + } + } +} -export const combine_video_segments: Task = async function (payload: unknown, helpers: Helpers) { - // helpers.logger.info('the following is the raw Task payload') - // helpers.logger.info(payload) - // helpers.logger.info(JSON.stringify(payload?.s3_manifest)) - assertPayload(payload) - const { s3_manifest, vod_id } = payload - if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.'); - helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`) +/** + * downloadSegment + * + * - [x] If the file is already in local cache, that file is used. + * - [x] Validates checksum to ensure file is whole + */ +async function downloadSegment(client: S3Client, segment: SegmentResponse) { + if (!segment) throw new Error('segment passed to downloadSegment was missing'); + if (!segment.s3_key) throw new Error('segment passed to downloadSegment was missing s3_key'); + const cachePath = join(configs.cacheDir, segment.s3_key); + const isFileInCache = await fileExists(cachePath); + + // Check and return cache if available + if (isFileInCache) { + await validateChecksumIfNeeded(cachePath, segment); + return cachePath; + } + + // Download segment and validate checksum + const tmpFilePath = await withRetry(() => downloadFile(client, configs.s3UscBucket, segment.s3_key), 3); + await validateChecksumIfNeeded(tmpFilePath, segment); + + return tmpFilePath; +} + +/** + * downloadSegments + * + * Download a list of segments from S3. + + */ +async function downloadSegments(client: S3Client, segments: SegmentResponse[]) { + for (const segment of segments) { + const segmentFilePath = await downloadSegment(client, segment) + } +} + +/** + * doIntegratedCombine + * + * Integrated combine_video_segments is when the requester is giving us only a vod_id + * It's our job to inspect the vod and get it's segments and combine them + * Then upload the result to S3 and update the vod's s3_file + * + */ +async function doIntegratedCombine(helpers: Helpers, vod_id: string) { /** * Here we take a manifest of S3 files and we download each of them. * Then we combine them all, preserving original order using `ffmpeg -f concat` @@ -167,25 +235,44 @@ export const combine_video_segments: Task = async function (payload: unknown, he * we edit the stream record, adding a relation to the vod we just created. */ - try { + + const vod = await getVod(vod_id) + if (!vod) throw new Error('doIntegratedCombine failed to get vod'); - const client = new S3Client({ + // * [x] Get a list of segments associated with this vod. + // * [ ] Download the segments (or pull from disk cache) + // * [ ] VALIDATE segment checksums(?) + // * [ ] Combine segments using ffmpeg + // * [ ] Upload resulting video + // * [ ] Create s3_file with resulting video information + // * [ ] Update s3_file vod record to reference s3_file + + const options: S3ClientConfig = { endpoint: configs.s3Endpoint, region: configs.s3Region, credentials: { accessKeyId: configs.s3AccessKeyId, secretAccessKey: configs.s3SecretAccessKey } - }); + } + const client = new S3Client(options); + + if (!vod.segments) throw new Error('vod.segments was missing'); + const segments = vod.segments + + const localSegments = await downloadSegments(client, segments) + const s3Manifest = s3_manifest - const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key))) + + + const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m: S3FileRecord) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key))) const concatenatedVideoFile = await concatVideos(inputVideoFilePaths) const s3KeyName = basename(concatenatedVideoFile) const inputStream = createReadStream(concatenatedVideoFile) const filePath = concatenatedVideoFile const { uploadStream, upload } = await getS3ParallelUpload({ client, s3KeyName, filePath }) - pipelinePromise(inputStream, uploadStream) + pipeline(inputStream, uploadStream) await upload.done() if (!vod_id) throw new Error('vod_id was missing from payload'); @@ -198,7 +285,7 @@ export const combine_video_segments: Task = async function (payload: unknown, he } catch (e: any) { - helpers.logger.error('combined_video_segments failed') + helpers.logger.error('combine_video_segments failed') if (e instanceof Error) { helpers.logger.error(e.message) } else { @@ -207,6 +294,42 @@ export const combine_video_segments: Task = async function (payload: unknown, he throw e } +} + + +/** + * doSoloRequest + * + * Solo combine_video_segments is when the requester is giving us a s3_manifest + * Which is just a S3FileRecord[] + * + * It's our job to download those files and combine them, then upload the result to S3 + */ +async function doSoloCombine(helpers: Helpers, s3_manifest: S3FileRecord[]) { + const s3Manifest = s3_manifest + const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key))) +} + + +export const combine_video_segments: Task = async function (payload: unknown, helpers: Helpers) { + // helpers.logger.info('the following is the raw Task payload') + // helpers.logger.info(payload) + // helpers.logger.info(JSON.stringify(payload?.s3_manifest)) + assertPayload(payload) + const { s3_manifest, vod_id } = payload + if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.'); + helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`) + + const isSoloRequest = (!!vod_id && !s3_manifest) + const isIntegratedRequest = (!!s3_manifest?.length && !vod_id) + + if (isSoloRequest && !isIntegratedRequest) { + await doSoloCombine(helpers, s3_manifest!) + } else if (isIntegratedRequest && !isSoloRequest) { + await doIntegratedCombine(helpers, vod_id) + } else { + throw new Error(`Ambiguous request. Use either s3_manifest or vod_id argument, not both.`); + } } diff --git a/services/factory/src/tasks/generate_thumbnail.ts b/services/build/src/tasks/generate_thumbnail.ts similarity index 57% rename from services/factory/src/tasks/generate_thumbnail.ts rename to services/build/src/tasks/generate_thumbnail.ts index cf0bf5e..3f77ed0 100644 --- a/services/factory/src/tasks/generate_thumbnail.ts +++ b/services/build/src/tasks/generate_thumbnail.ts @@ -6,25 +6,21 @@ import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase.ts"; import { configs } from "../config"; interface Payload { - vod_id: string; + vod_id?: string; + video_url?: string; } function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload (it must be an object)"); - if (typeof payload.vod_id !== "string") throw new Error("payload.vod_id was not a string"); + if (typeof payload.vod_id !== "string" && typeof payload.video_url !== "string") throw new Error("payload requires either vod_id or video_url, however both were not a string."); } +async function doIntegratedRequest(vodId: string): Promise { -export const generate_thumbnail: Task = async function (payload: unknown, helpers: Helpers) { - assertPayload(payload) - const { vod_id } = payload - if (!vod_id) throw new Error('generate_thumbnail Task was started without a vod_id. This is unsupported.'); - helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}`); - - const vod = await getVod(vod_id) + const vod = await getVod(vodId) const s3_file = vod?.s3_file - if (!s3_file) throw new Error(`vod ${vod_id} was missing a s3_file.`); + if (!s3_file) throw new Error(`vod ${vodId} was missing a s3_file.`); // we need to get a CDN url to the vod so we can download chunks of the file in order for Prevvy to create the storyboard image. const cdnUrl = getCdnUrl(configs.s3MainBucket, s3_file.s3_key) @@ -41,15 +37,48 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper s3Endpoint: configs.s3Region, s3Region: configs.s3Region } + const upload = await uploadFile(uploadArgs) if (!upload) throw new Error(`failed to upload ${tmpImagePath} to S3`); if (!upload.Key) throw new Error(`failed to upload ${tmpImagePath} to S3 (upload.Key was missing)`); // we need to create a S3 file in the db const thumbnail = getCdnUrl(configs.s3MainBucket, upload.Key) - await patchVodInDatabase(vod_id, { thumbnail }) + await patchVodInDatabase(vodId, { thumbnail }) + + +} + +async function doSoloRequest(videoUrl: string): Promise { + return await getThumbnailUrl(videoUrl) +} + + +export const generate_thumbnail: Task = async function (payload: unknown, helpers: Helpers) { + assertPayload(payload) + const { vod_id, video_url } = payload + + helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}, video_url=${video_url}`); + + + // Determine what kind of request is being made. + // It could be one of two scenarios + // * Here is a VOD record in the database, please update it's thumbnail. (integratedRequest) + // * Here is a video URL, please give us a thumbnail URL. (soloRequest) + // + const integratedRequest = (!!vod_id && !video_url) + const soloRequest = (!!video_url && !vod_id) + + if (integratedRequest) { + await doIntegratedRequest(vod_id) + } else if (soloRequest) { + await getThumbnailUrl(video_url) + } else { + throw new Error(`unsupported ambiguous request!`) + } + } export default generate_thumbnail \ No newline at end of file diff --git a/services/factory/src/tasks/process_video.ts b/services/build/src/tasks/process_video.ts similarity index 100% rename from services/factory/src/tasks/process_video.ts rename to services/build/src/tasks/process_video.ts diff --git a/services/factory/src/tasks/remux_video.ts b/services/build/src/tasks/remux_video.ts similarity index 100% rename from services/factory/src/tasks/remux_video.ts rename to services/build/src/tasks/remux_video.ts diff --git a/services/factory/src/utils/importDirectory.ts b/services/build/src/utils/importDirectory.ts similarity index 100% rename from services/factory/src/utils/importDirectory.ts rename to services/build/src/utils/importDirectory.ts diff --git a/services/factory/tsconfig.json b/services/build/tsconfig.json similarity index 100% rename from services/factory/tsconfig.json rename to services/build/tsconfig.json diff --git a/services/capture/pnpm-lock.yaml b/services/capture/pnpm-lock.yaml index 2f17bb3..8ca66dc 100644 --- a/services/capture/pnpm-lock.yaml +++ b/services/capture/pnpm-lock.yaml @@ -199,34 +199,6 @@ importers: specifier: ^5.5.4 version: 5.5.4 - ../..: {} - - ../../packages/fetchers: {} - - ../../packages/infra: {} - - ../../packages/storage: {} - - ../../packages/types: {} - - ../../packages/utils: {} - - ../bot: {} - - ../factory: {} - - ../mailbox: {} - - ../migrations: {} - - ../next: {} - - ../scout: {} - - ../strapi: {} - - ../uppy: {} - packages: '@aws-crypto/crc32@5.2.0': diff --git a/services/capture/src/RecordNextGeneration.ts b/services/capture/src/RecordNextGeneration.ts index 66a8bf3..d9380bb 100644 --- a/services/capture/src/RecordNextGeneration.ts +++ b/services/capture/src/RecordNextGeneration.ts @@ -3,83 +3,39 @@ */ import { VodResponse } from "@futureporn/types" -import getVod from '@futureporn/fetchers/getVod.ts' import { PassThrough, Readable, type Writable } from "stream" -import { tmpdir } from 'node:os' import { join } from 'node:path' import { ua0 } from '@futureporn/utils/name.ts' -import { spawn } from 'child_process' +import { ChildProcessByStdio, spawn } from 'child_process' import { pipeline } from "stream/promises" import { configs } from "./config" import { nanoid } from 'nanoid' import { Upload, type Progress } from "@aws-sdk/lib-storage" -import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3' +import { S3Client } from '@aws-sdk/client-s3' import prettyBytes from "pretty-bytes" import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts" import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts" import createSegment from "@futureporn/fetchers/createSegment.ts" -import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts" import { createReadStream, createWriteStream } from "fs" -import pRetry, {AbortError} from 'p-retry' +import pRetry from 'p-retry' import { type SegmentResponse } from '@futureporn/types' import getPlaylistUrl from "@futureporn/fetchers/getPlaylistUrl.ts" -import { isBefore, sub } from 'date-fns' +import { isBefore, isAfter, sub } from 'date-fns' +import { setTimeout } from 'node:timers/promises'; +import { + RoomOfflineError, + PlaylistFailedError, + AdminAbortedError, + ExhaustedRetriesError, +} from '@futureporn/utils/error.ts' export interface RecordNextGenerationArguments { vodId: string; url: string; } -export class AdminAbortedError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, AdminAbortedError.prototype) - this.name = this.constructor.name - this.message = `AdminAbortedError. ${this.message}` - } - getErrorMessage() { - return this.message - } -} -export class UploadFailedError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, UploadFailedError.prototype) - this.name = this.constructor.name - this.message = `UploadFailedError. ${this.message}` - } - getErrorMessage() { - return this.message - } -} - -export class PlaylistFailedError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, PlaylistFailedError.prototype) - this.name = this.constructor.name - this.message = `PlaylistFailedError. ${this.message}` - } - getErrorMessage() { - return this.message - } -} - - - -export class DownloadFailedError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, DownloadFailedError.prototype) - this.name = this.constructor.name - this.message = `DownloadFailedError. ${this.message}` - } - getErrorMessage() { - return this.message - } -} /** @@ -89,21 +45,51 @@ export class DownloadFailedError extends Error { * * ## Issues/TODO list * - * @todo [x] onProgress stops firing - * @todo [x] OOMKilled seen via development environment + * @done [x] onProgress stops firing + * @done [x] OOMKilled seen via development environment * @todo [ ] undefined behavior during CB private shows * @todo [ ] does not handle CB Hidden Camera ticket shows - * @todo [ ] Upload segments in a way that does not interrupt downloading new segments. + * @done [x] Upload segments in a way that does not interrupt downloading new segments. * There is an issue where a segment download ends, and the segment upload immediately begins. * At first glance this looks like good behavior, but what is happening during the segment upload is that the livestream * is continuing, but we aren't recording it anymore. We are using Backblaze, thus uploads are slow. * We miss a lot of the stream because the upload takes many minutes. * Instead of this behavior of immediately uploading after a segment download completes, we should upload once the livestream is finished, * OR we should upload while concurrently downloading the next segment. - * @todo [ ] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context. + * @done [x] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context. * There is an issue where the `record` task needs to retry after a temporary failure, but it cannot because there aren't any available workers. * The solution is to not exit the `record` task at all, and instead keep the `record` task running, but suspended while a exponential backoff timer elapses. * This way, the worker stays focused on the recording and retries until the stream has been offline for n minutes, at which point `record` is complete. + * @done [x] Abort process results in corrupted .ts files that ffmpeg/vlc/kdenlive cannot read. Granted, aborted vods are those which are NOT desirable to save, so maybe we ignore this issue? + * @done [x] RecordNextGeneration gives up immediately in response to RoomOffline. It must retry until 5 minutes have elapsed. + * @done [x] .bytes and .bytes_uploaded do not match at the end of the upload. .bytes_uploaded is curiously larger than .bytes! + * @todo [ ] Two jobs can occur concurrently (technically env var WORKER_CONCURRENCY allows for >1 so this isn't a bug) + * I set it to 1. Now we just need to test that only 1 job can happen at a time. + * + * ``` + * updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4571791360 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca +updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4592762880 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca +updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4613734400 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca +updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4624220160 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca +updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4645191680 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca +during startProgressReports(), we encountered the following error. +TypeError: fetch failed + at node:internal/deps/undici/undici:13178:13 + at process.processTicksAndRejections (node:internal/process/task_queues:95:5) + at async updateSegmentInDatabase (/app/packages/fetchers/src/updateSegmentInDatabase.ts:29:15) + at async RecordNextGeneration.updateSegmentBytes (/app/services/capture/src/RecordNextGeneration.ts:342:21) + at async RecordNextGeneration.updateDatabaseRecords (/app/services/capture/src/RecordNextGeneration.ts:326:5) + at async Timeout._onTimeout (/app/services/capture/src/RecordNextGeneration.ts:352:9) { + [cause]: Error: getaddrinfo EAI_AGAIN postgrest.futureporn.svc.cluster.local + at GetAddrInfoReqWrap.onlookupall [as oncomplete] (node:dns:120:26) { + errno: -3001, + code: 'EAI_AGAIN', + syscall: 'getaddrinfo', + hostname: 'postgrest.futureporn.svc.cluster.local' + } +} +``` + * * */ export default class RecordNextGeneration { @@ -116,6 +102,7 @@ export default class RecordNextGeneration { public tmpDiskPath?: string; private vod?: VodResponse | null; private downloadStream?: Readable; + private downloadProcess?: ChildProcessByStdio; private uploadStream?: PassThrough; private uploadInstance?: Upload; private diskStream?: Writable; @@ -139,6 +126,7 @@ export default class RecordNextGeneration { this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this)) this.retries = 0 this.segments = [] + } async withRetry(fn: any, retries = 3) { @@ -153,61 +141,67 @@ export default class RecordNextGeneration { abortEventListener() { console.log(`abortEventListener has been invoked. this.abortSignal is as follows`) console.log(this.abortController.signal) - console.log(JSON.stringify(this.abortController.signal, null, 2)) - const reason = this.abortController.signal.reason - if (this.downloadStream) { - console.log(`aborted the stream download with reason=${reason}`) - this.downloadStream.destroy(new AdminAbortedError()) - } else { - console.warn(`downloadStream does not exist. Perhaps it has already been aborted?`) - } - } - - - getMultipartUpload({ - client, - bucket, - key, - body, - }: { - client: S3Client, - bucket: string, - key: string, - body: Readable, - }) { - const params = { - Bucket: bucket, - Key: key, - Body: body - } - const upload = new Upload({ - client, - partSize: 1024 * 1024 * 5, - queueSize: 1, - // @see https://github.com/aws/aws-sdk-js-v3/issues/2311 - // tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior. - // It should instead be interpreted as, 'throwOnPartsError' - leavePartsOnError: true, - params - }) - - /** - * aws client docs recommend against using async onProgress handlers. - * therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere. - */ - const onProgress = (progress: Progress) => { - if (progress?.loaded) { - console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`) - this.reportMemoryUsage() - this.uploadCounter = progress.loaded + // console.log(JSON.stringify(this.abortController.signal, null, 2)) + // const reason = this.abortController.signal.reason + if (this.downloadProcess) { + // console.log(`aborted the stream process with reason=${reason}`) + // we want to send SIGINT to ffmpeg rather than forcefully destroying the stream. + // this prevents the downloaded .ts file from being corrupted. + // this.downloadStream.destroy(new AdminAbortedError()) + this.downloadProcess.kill('SIGINT'); + if (this.downloadStream) { + this.downloadStream.emit('error', new AdminAbortedError()); } + } else { + console.warn(`downloadProcess does not exist. Perhaps it has already been aborted?`) } - upload.on("httpUploadProgress", onProgress); - - return upload } + // getMultipartUpload({ + // client, + // bucket, + // key, + // body, + // }: { + // client: S3Client, + // bucket: string, + // key: string, + // body: Readable, + // }) { + // const params = { + // Bucket: bucket, + // Key: key, + // Body: body + // } + // const upload = new Upload({ + // client, + // partSize: 1024 * 1024 * 5, + // queueSize: 1, + // // @see https://github.com/aws/aws-sdk-js-v3/issues/2311 + // // tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior. + // // It should instead be interpreted as, 'throwOnPartsError' + // leavePartsOnError: true, + // params + // }) + + // /** + // * aws client docs recommend against using async onProgress handlers. + // * therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere. + // */ + // const onProgress = (progress: Progress) => { + // if (progress?.loaded) { + // console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`) + // this.reportMemoryUsage() + // this.uploadCounter = progress.loaded + // } + // } + // upload.on("httpUploadProgress", onProgress); + + // return upload + // } + + // @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA. // when the download stream closes before the upload stream, I think the upload stream gets cut off. // this means the upload isn't allowed to save and that means no data whatsoever gets put to S3. @@ -227,11 +221,12 @@ export default class RecordNextGeneration { static getDiskStream(s3Key?: string) { - const tmpDiskPath = join(tmpdir(), s3Key || `${nanoid()}.ts`) + const tmpDiskPath = join(configs.cacheDir, s3Key || `${nanoid()}.ts`) + console.log(`getDiskStream() tmpDiskPath=${tmpDiskPath}`) return createWriteStream(tmpDiskPath, { encoding: 'utf-8' }) } - static getFFmpegStream({ playlistUrl }: { playlistUrl: string }): Readable { + static getFFmpeg({ playlistUrl }: { playlistUrl: string }): ChildProcessByStdio { console.log(`getFFmpegStream using playlistUrl=${playlistUrl}`) const ffmpegProc = spawn('ffmpeg', [ '-headers', `"User-Agent: ${ua0}"`, @@ -247,14 +242,19 @@ export default class RecordNextGeneration { // ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang stdio: ['pipe', 'pipe', 'ignore'] }) - return ffmpegProc.stdout + return ffmpegProc } - onUploadProgress(progress: Progress) { - if (progress?.loaded) { - console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`) + + + onUploadProgress(segment: SegmentResponse, progress: Progress) { + // console.log('onUploadProgress() running now. progress as follows') + // console.log(progress) + // find the matching segment and update it's bytes_uploaded + const matchingSegment = this.segments.find((s) => s.id === segment.id) + if (progress?.loaded && matchingSegment) { + matchingSegment.bytes_uploaded = progress.loaded this.reportMemoryUsage() - this.uploadCounter = progress.loaded } } @@ -273,25 +273,9 @@ export default class RecordNextGeneration { if (mem.rss > 256000000) console.warn(`High memory usage! ${JSON.stringify(this.formatMemoryStats(mem))}`); } - // handleClosedStream() { - // if (this.downloadStream?.destroyed && this.uploadStream?.destroyed) { - // console.log(`both downloadStream and uploadStream are destroyed which suggests success.`) - // console.log(`we need to cleanly exit`) - // } - // else if (this.downloadStream?.destroyed && !this.uploadStream?.destroyed) { - // console.log(`downloadStream was destroyed but uploadStream was not, which suggests a download failure.`) - // console.log(`we need to do nothing and wait for the uploadStream to finish.`) - // } - // else if (!this.downloadStream?.destroyed && this.uploadStream?.destroyed) { - // console.log(`uploadStream was destroyed but downloadStream was not, which suggests an upload failure.`) - // console.log(`we need to throw immediately so record() can retry and start a new segment.`) - // throw new UploadFailedError() - // } - // } - getS3Client() { - const clientOptions: S3ClientConfig = { + const clientOptions: any = { endpoint: configs.s3Endpoint, region: configs.s3Region, credentials: { @@ -311,52 +295,34 @@ export default class RecordNextGeneration { getNames() { const s3Key = `${nanoid()}.ts` - const tmpDiskPath = join(tmpdir(), s3Key) + const tmpDiskPath = join(configs.cacheDir, s3Key) console.log(`tmpDiskPath=${tmpDiskPath}`) return { tmpDiskPath, s3Key } } - /** - * getDatabaseRecords - * - * Get records from the database - * * vod - * * segment - * * segment_vod_link - */ - // async getDatabaseRecords() { - // this.vod = await getVod(this.vodId) - // if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`); - // if (this.vod.recording.is_aborted) throw new AdminAbortedError(); - - - // if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.'); - // const segmentId = await createSegment(this.s3Key, this.vodId) - // const segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId) - - // if (!this.vod) throw new Error('after getDatabaseRecords() ran, this.vod was missing.'); - // if (!segmentId) throw new Error('after getDatabaseRecords() ran, segmentId was missing.'); - // if (!segmentVodLinkId) throw new Error('after getDatabaseRecords() ran, segmentVodLinkId was missing.'); - - // return { segmentId, segmentVodLinkId } - // } - static async _dl(url: string, s3Key: string) { - const playlistUrl = await getPlaylistUrl(url) - if (!playlistUrl) throw new PlaylistFailedError(); - const ffmpegStream = RecordNextGeneration.getFFmpegStream({ playlistUrl }) + const { data, error } = (await getPlaylistUrl(url)) + if (!data) throw new PlaylistFailedError(); + if (error) { + if (error === 'PlaylistFailedError') throw new PlaylistFailedError(); + if (error === 'RoomOfflineError') throw new RoomOfflineError(); + } + console.log(`_dl playlistUrl=${data.playlistUrl}`) + const ffmpegProcess = RecordNextGeneration.getFFmpeg({ playlistUrl: data.playlistUrl }) + const ffmpegStream = ffmpegProcess.stdout const diskStream = RecordNextGeneration.getDiskStream(s3Key) const streamPipeline = pipeline(ffmpegStream, diskStream) - return { + return { pipeline: streamPipeline, ffmpegStream, + ffmpegProcess, diskStream, } } static async _ul(client: S3Client, diskPath: string, key: string) { - const diskStream = createReadStream(diskPath, { encoding: 'utf-8' }) + const diskStream = createReadStream(diskPath) const params = { Bucket: configs.s3UscBucket, @@ -365,7 +331,7 @@ export default class RecordNextGeneration { } const uploadInstance = new Upload({ client, - partSize: 1024 * 1024 * 5, + partSize: 1024 * 1024 * 10, queueSize: 1, // @see https://github.com/aws/aws-sdk-js-v3/issues/2311 // tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior. @@ -380,62 +346,6 @@ export default class RecordNextGeneration { } } - async upload() { - const s3Client = this.s3Client - const s3Key = this.s3Key - const tmpDiskPath = this.tmpDiskPath - if (!s3Client) throw new Error('s3Client was missing during upload()'); - if (!s3Key) throw new Error('s3Key was missing during upload()'); - if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()'); - const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' }) - - this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream }) - - await this.uploadInstance.done() - } - - - /** - * # handleExceptions - * - * We want to handle any exceptions that are thrown, so our process continues running. - * Ideally we know every failure scenario and we handle it graceefully. - * If we ever reach the default: case below, it's a bug and we need to patch it. - */ - handleExceptions (e: any, phase?: string) { - console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`) - - if (e instanceof Error && e.name === 'RoomOfflineError') { - // if the room is offline, we re-throw the RoomOfflineError so the recording gets retried - // we do this because the offline might be a temporary situation. - // e.g. streamer's computer bluescreened and they're coming back after they reboot. - // @todo try again immediately - - } else if (e instanceof Error && e.name === 'PlaylistFailedError') { - // sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again. - // @todo try again immediately - - } else if (e instanceof Error && e.name === 'AdminAbortedError') { - // An admin aborted the recording which means we don't want to retry recording. - // we return which causes the 'record' Task to be marked as successful. - console.log(`clear as day, that is an AdminAbortedError! ❤️`) - return - - } else if (e instanceof Error && e.name === 'DownloadFailedError') { - throw e - - } else if (e instanceof Error && e.message === 'no tomes available') { - console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`) - throw e - - } else if (e instanceof Error && e.name === 'UploadFailedError') { - throw e - - } else { - console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`) - console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e)) - } - } async updateDatabaseRecords() { await this.updateSegmentBytes() @@ -453,8 +363,10 @@ export default class RecordNextGeneration { async updateSegmentBytes() { for (const [index, segment] of this.segments.entries()) { if (segment.id) { - console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} segments.bytes=${segment.bytes}`) - await updateSegmentInDatabase({ segment_id: segment.id, fileSize: segment.bytes }) + console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} .bytes=${segment.bytes} .bytes_uploaded=${segment.bytes_uploaded} .s3_key=${segment.s3_key} .vod.id=${segment?.vod?.id}`) + const seg = await updateSegmentInDatabase({ segment_id: segment.id, bytes: segment.bytes, bytes_uploaded: segment.bytes_uploaded }) + let matchingSegment = this.segments.find((s) => s.id === seg.id) + if (matchingSegment) matchingSegment = seg // update the locally cached segment so update_at is accurate (used during isTryingDownload()) } } } @@ -480,14 +392,17 @@ export default class RecordNextGeneration { * There are always more tries unless the stream has been offline for greater than 5 minutes. */ isTryingDownload() { - const isSegmentPresent = (this.segments && this.segments?.length > 0) + + const isSegmentPresent = (!!this.segments && this.segments?.length > 0) + // console.log(`isTryingDownload() this.segments=${this.segments}, this.segments.length=${this.segments?.length}, isSegmentPresent=${isSegmentPresent} `); if (!isSegmentPresent) return true; const latestSegment = this.segments.at(-1) const hasUpdatedTimestamp = latestSegment?.updated_at if (!hasUpdatedTimestamp) throw new Error('latestSegment does not have an updated_at property'); const fiveMinsAgo = sub(new Date(), { minutes: 5 }) const lastUpdatedAt = latestSegment.updated_at - return (isBefore(lastUpdatedAt, fiveMinsAgo)) ? true : false; + console.log(`isTryingDownload fiveMinsAgo=${fiveMinsAgo.toISOString()}, lastUpdatedAt=${lastUpdatedAt}`) + return isAfter(lastUpdatedAt, fiveMinsAgo) } @@ -505,8 +420,14 @@ export default class RecordNextGeneration { async done() { this.startProgressReports(); try { + console.log(`>> downloadSegments() begin.`) await this.downloadSegments(); + console.log(`>> downloadsSegments() finished. ${this.segments.length} downloaded.`) + console.log(`waiting 30 seconds for downloadSegments() to close ffmpeg ...`) // @todo potential speed optimization is to proceed as soon as file is done being written to + await setTimeout(30*1000) + console.log(`>> uploadSegments() begin.`) await this.uploadSegments(); + console.log(`>> uploadSegments() finished.`) } catch (e) { console.error(`An error was encountered during done() function. This should not happen under nominal scenarios. This may be a bug; please investigate.`) throw e @@ -528,11 +449,13 @@ export default class RecordNextGeneration { if (!segment.id) { throw new Error(`Failed to createSegment(). segment.id was missing.`); } - console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`) + console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vod:vods(*,recordings(*))`) this.segments.push(segment) - const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key)) + const { pipeline, ffmpegStream, ffmpegProcess } = (await RecordNextGeneration._dl(this.url, s3_key)) if (this.downloadStream) throw new Error(`If you see this error, there is a bug in your code. downloadSegment() tried to use this.downloadStream but it was already being used by some other part of the code. Please refactor so this.downloadStream is not used by more than one function at any given time.`); this.downloadStream = ffmpegStream + this.downloadProcess = ffmpegProcess + console.log(`setting downloadProcess as follows. ${ffmpegProcess}`) ffmpegStream.on('data', (data) => { let mySegment = this.segments.find((s) => s.id === segment.id) if (mySegment) { @@ -554,13 +477,28 @@ export default class RecordNextGeneration { */ async downloadSegments(): Promise { try { - await this.downloadSegment() + while (this.isTryingDownload()) { + // Backoff timer, in case of followup segments. + // if most recent segment was created greater than 1 minute ago, there is no timer. (immediate retry) + // else the retry timer is 30 seconds (wait before retrying) + const mostRecentSegmentCreationTimestamp = this.segments.at(-1)?.created_at + if (mostRecentSegmentCreationTimestamp) { + const thirtySecondsAgo = sub(new Date(), { seconds: 30 }) + const mostRecentSegmentCreation = new Date(mostRecentSegmentCreationTimestamp) + const isMostRecentSegmentCreatedGreaterThanOneMinuteAgo = isBefore(mostRecentSegmentCreation, thirtySecondsAgo) + if (isMostRecentSegmentCreatedGreaterThanOneMinuteAgo) console.log(`Waiting 30 seconds before next downloadSegment().`); + await setTimeout((isMostRecentSegmentCreatedGreaterThanOneMinuteAgo) ? 1000*30 : 0) + } + await this.downloadSegment() + } } catch (e) { if (e instanceof Error && e.name === 'RoomOfflineError') { // If the room is offline, then we want to retry immediately. // We do this because the offline room might be a temporary situation. // e.g. streamer's computer bluescreened and they're coming back after they reboot. // If the room has been offline for >5 minutes, then we consider the stream concluded and we return. + console.warn('Room is offline! ~~ lets try again, if appropriate.') + console.log(`isTryingDownload()=${this.isTryingDownload()}`) if (this.isTryingDownload()) { return this.downloadSegments() } else { @@ -568,7 +506,7 @@ export default class RecordNextGeneration { } } else if (e instanceof Error && e.name === 'PlaylistFailedError') { - // sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again. + console.error(`sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.`) return this.downloadSegments() } else if (e instanceof Error && e.name === 'AdminAbortedError') { @@ -594,7 +532,7 @@ export default class RecordNextGeneration { */ async uploadSegments() { try { - for (const segment of this.segments) { + for (const segment of this.segments.filter((s) => s.bytes > 0)) { await this.uploadSegment(segment) } } catch (e) { @@ -605,79 +543,28 @@ export default class RecordNextGeneration { } async uploadSegment(segment: SegmentResponse) { - const diskPath = join(tmpdir(), segment.s3_key) + const diskPath = join(configs.cacheDir, segment.s3_key) const key = segment.s3_key const client = this.getS3Client() + await pRetry(async (attemptCount: number) => { console.log(`uploadSegment() attempt ${attemptCount}`) if (!this.s3Client) throw new Error('S3Client') const { uploadInstance } = (await RecordNextGeneration._ul(client, diskPath, key)) - uploadInstance.on('httpUploadProgress', () => this.onUploadProgress) + uploadInstance.on('httpUploadProgress', (progress: Progress) => this.onUploadProgress(segment, progress)) return uploadInstance.done() }, { onFailedAttempt: (e) => { - console.error(`failed to uploadSegment() with the following error. Retrying.`) + console.error(`failed to uploadSegment() with the following error. Retrying with ${e.retriesLeft} retries left.`) console.error(e) }, retries: 9 }) - + + const matchingSegment = this.segments.find((s) => s.id === segment.id) + if (matchingSegment?.bytes_uploaded && matchingSegment?.bytes) { + matchingSegment.bytes_uploaded = matchingSegment.bytes + } + } } - - -// async done_old() { - -// /** -// * Errors thrown inside the setTimeout callback will end up as an uncaughtException. -// * We handle those errors here to prevent node from exiting. -// */ -// process.on('uncaughtException', (e) => { -// this.stopProgressReports() -// console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`) -// console.log(e) -// process.exit(69) -// }) - -// try { -// const client = this.getS3Client() - - -// console.log(`>> 1. Segment downloading phase`) -// while (this.isTryingDownload()) { -// if (!this.databaseUpdateTimer) this.startProgressReports(); - -// const s3_key = `${nanoid()}.ts` -// const segment = await createSegment(s3_key, this.vodId) -// if (!segment.id) { -// console.log('the following is segment') -// console.log(segment) -// throw new Error(`received invalid segment from db fetch()`); -// } -// this.segments.push(segment) -// console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`) - -// const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key)) -// this.downloadStream = ffmpegStream -// ffmpegStream.on('data', (data) => { -// let mSegment = this.segments.find((s) => s.id === segment.id) -// if (mSegment) { -// mSegment.bytes += data.length; -// } -// }) -// await pipeline -// } - -// console.log(`>> 2. Segment uploading phase`) - - - -// } catch (e) { -// this.stopProgressReports() -// return this.handleExceptions(e, '_dl()|_ul()') - - -// } -// } - -// } diff --git a/services/capture/src/config.ts b/services/capture/src/config.ts index e4426ec..f8a1fb5 100644 --- a/services/capture/src/config.ts +++ b/services/capture/src/config.ts @@ -9,6 +9,8 @@ const requiredEnvVars = [ 'S3_USC_BUCKET', 'POSTGREST_URL', 'AUTOMATION_USER_JWT', + 'CACHE_DIR', + 'WORKER_CONCURRENCY', ] as const; const getEnvVar = (key: typeof requiredEnvVars[number]): string => { @@ -28,6 +30,8 @@ export interface Config { s3Region: string; s3UscBucket: string; s3Endpoint: string; + cacheDir: string; + workerConcurrency: number; } @@ -40,4 +44,6 @@ export const configs: Config = { s3Region: getEnvVar('S3_REGION'), s3UscBucket: getEnvVar('S3_USC_BUCKET'), s3Endpoint: getEnvVar('S3_ENDPOINT'), + cacheDir: getEnvVar('CACHE_DIR'), + workerConcurrency: parseInt(getEnvVar('WORKER_CONCURRENCY')), } \ No newline at end of file diff --git a/services/capture/src/index.ts b/services/capture/src/index.ts index b45c0e3..7ff0112 100644 --- a/services/capture/src/index.ts +++ b/services/capture/src/index.ts @@ -3,7 +3,7 @@ import { build } from './app.ts' import 'dotenv/config' - +import { configs } from './config.ts' import { makeWorkerUtils, type WorkerUtils, Runner, RunnerOptions, run as graphileRun } from 'graphile-worker' import { join, dirname } from 'node:path'; import { fileURLToPath } from 'url'; @@ -17,11 +17,10 @@ const version = getPackageVersion(join(__dirname, '../package.json')) if (!process.env.FUNCTION) throw new Error(`FUNCTION env var was missing. FUNCTION env var must be either 'api' or 'worker'.`); if (!process.env.WORKER_CONNECTION_STRING) throw new Error(`WORKER_CONNECTION_STRING env var was missing`); const connectionString = process.env.WORKER_CONNECTION_STRING! -const concurrency = (process.env?.WORKER_CONCURRENCY) ? parseInt(process.env.WORKER_CONCURRENCY) : 1 const preset: GraphileConfig.Preset = { worker: { connectionString: process.env.WORKER_CONNECTION_STRING, - concurrentJobs: concurrency, + concurrentJobs: configs.workerConcurrency, fileExtensions: [".js", ".ts"], }, }; @@ -59,7 +58,7 @@ async function doRunWorker(workerUtils: WorkerUtils) { const runnerOptions: RunnerOptions = { preset, - concurrency, + concurrency: configs.workerConcurrency, taskDirectory: join(__dirname, 'tasks'), } diff --git a/services/migrations/migrations/00072_create-builds-table.sql b/services/migrations/migrations/00072_create-builds-table.sql new file mode 100644 index 0000000..92bd578 --- /dev/null +++ b/services/migrations/migrations/00072_create-builds-table.sql @@ -0,0 +1,33 @@ +-- builds table schema +CREATE TABLE api.builds ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + vod UUID NOT NULL REFERENCES api.vods(id), + task TEXT NOT NULL, + created_at timestamp(6) without time zone, + updated_at timestamp(6) without time zone +); + +-- roles & permissions for our backend automation user +GRANT all ON api.builds TO automation; +GRANT SELECT ON api.builds TO web_anon; + + +-- trigger function for starting the appropriate task when a new api.builds row is added +CREATE FUNCTION public.tg__add_build_job() RETURNS trigger + LANGUAGE plpgsql SECURITY DEFINER + SET search_path TO 'pg_catalog', 'public', 'pg_temp' + AS $$ + begin + PERFORM graphile_worker.add_job(NEW.task, json_build_object( + 'vod', NEW.vod + ), max_attempts := 6); + return NEW; + end; + $$; + + + +CREATE TRIGGER create_build + AFTER UPDATE ON api.builds + FOR EACH ROW + EXECUTE FUNCTION tg__add_build_job(); diff --git a/services/migrations/migrations/00073_update-builds-table-timestamps.sql b/services/migrations/migrations/00073_update-builds-table-timestamps.sql new file mode 100644 index 0000000..eb3f060 --- /dev/null +++ b/services/migrations/migrations/00073_update-builds-table-timestamps.sql @@ -0,0 +1,14 @@ + + + +ALTER TABLE api.builds + ALTER COLUMN created_at SET DEFAULT now(); + +ALTER TABLE api.builds + ALTER COLUMN updated_at SET DEFAULT now(); + +CREATE TRIGGER update_builds + BEFORE UPDATE ON api.builds + FOR EACH ROW + EXECUTE PROCEDURE moddatetime(updated_at); + diff --git a/services/migrations/migrations/00074_switch-after-update-to-after-insert.sql b/services/migrations/migrations/00074_switch-after-update-to-after-insert.sql new file mode 100644 index 0000000..30d0bfe --- /dev/null +++ b/services/migrations/migrations/00074_switch-after-update-to-after-insert.sql @@ -0,0 +1,7 @@ +-- fixing a mistake. I accidentally did AFTER UPDATE when I wanted AFTER INSERT. +DROP TRIGGER create_build ON api.builds; + +CREATE TRIGGER create_build + AFTER INSERT ON api.builds + FOR EACH ROW + EXECUTE FUNCTION tg__add_build_job(); diff --git a/services/migrations/migrations/00075_add-checksum-to-segments.sql b/services/migrations/migrations/00075_add-checksum-to-segments.sql new file mode 100644 index 0000000..609c92d --- /dev/null +++ b/services/migrations/migrations/00075_add-checksum-to-segments.sql @@ -0,0 +1,2 @@ +ALTER TABLE api.segments + ADD COLUMN checksum TEXT; \ No newline at end of file diff --git a/services/migrations/migrations/00076_builds-use-vod_id-instead-of-vod.sql b/services/migrations/migrations/00076_builds-use-vod_id-instead-of-vod.sql new file mode 100644 index 0000000..e0ec59e --- /dev/null +++ b/services/migrations/migrations/00076_builds-use-vod_id-instead-of-vod.sql @@ -0,0 +1,8 @@ + +-- for consistency, using vod_id instead of vod + +ALTER TABLE api.builds + DROP COLUMN vod; + +ALTER TABLE api.builds + ADD COLUMN vod_id UUID NOT NULL REFERENCES api.vods(id); \ No newline at end of file diff --git a/services/migrations/migrations/00077_builds-trigger-change-vod-to-vod_id.sql b/services/migrations/migrations/00077_builds-trigger-change-vod-to-vod_id.sql new file mode 100644 index 0000000..d52cf50 --- /dev/null +++ b/services/migrations/migrations/00077_builds-trigger-change-vod-to-vod_id.sql @@ -0,0 +1,22 @@ + +-- we've renamed api.builds.vod to api.builds.vod_id +DROP FUNCTION public.tg__add_build_job CASCADE; + + +CREATE FUNCTION public.tg__add_build_job() RETURNS trigger + LANGUAGE plpgsql SECURITY DEFINER + SET search_path TO 'pg_catalog', 'public', 'pg_temp' + AS $$ + begin + PERFORM graphile_worker.add_job(NEW.task, json_build_object( + 'vod_id', NEW.vod_id + ), max_attempts := 6); + return NEW; + end; + $$; + + +CREATE TRIGGER create_build + AFTER UPDATE ON api.builds + FOR EACH ROW + EXECUTE FUNCTION tg__add_build_job(); diff --git a/services/migrations/migrations/00078_add-bytes_uploaded-to-segments.sql b/services/migrations/migrations/00078_add-bytes_uploaded-to-segments.sql new file mode 100644 index 0000000..32752b4 --- /dev/null +++ b/services/migrations/migrations/00078_add-bytes_uploaded-to-segments.sql @@ -0,0 +1,2 @@ +ALTER TABLE api.segments + ADD COLUMN bytes_uploaded BIGINT DEFAULT 0; \ No newline at end of file diff --git a/services/scout/src/fastify.ts b/services/scout/src/fastify.ts index 8bf72c0..2ae1e58 100644 --- a/services/scout/src/fastify.ts +++ b/services/scout/src/fastify.ts @@ -12,7 +12,7 @@ import scrapeVtuberData from './scrapeVtuberData.ts' import { getPlaylistUrl } from './ytdlp.ts' import { getRandomRoom } from './cb.ts' import { getPackageVersion } from '@futureporn/utils/file.ts' - +import { type GenericApiResponse } from '@futureporn/types' type VtuberDataRequest = FastifyRequest<{ Querystring: { url: string } @@ -90,7 +90,7 @@ fastify.get('/ytdlp/playlist-url', { querystring: { type: 'object', properties: { - url: { + playlistUrl: { type: 'string' } } @@ -100,7 +100,7 @@ fastify.get('/ytdlp/playlist-url', { error: { type: 'boolean' }, message: { type: 'string' }, data: { type: 'object', properties: { - url: { type: 'string' } + playlistUrl: { type: 'string' } }} } }, @@ -108,9 +108,15 @@ fastify.get('/ytdlp/playlist-url', { } }, async (req: VtuberDataRequest, reply) => { try { - const playlistUrl = await getPlaylistUrl(req.query.url) - console.log(`playlistUrl=${playlistUrl}`) - reply.type('application/json').send(JSON.stringify({ data: { url: playlistUrl } })) + const { data, error, message } = await getPlaylistUrl(req.query.url) + console.log(`playlistUrl=${data.playlistUrl}`) + reply.type('application/json').send(JSON.stringify({ + data: { + playlistUrl: data.playlistUrl + }, + error: error, + message: message + })) } catch (e) { reply.type('application/json').send(JSON.stringify({ data: null, error: e })) } diff --git a/services/scout/src/ytdlp.ts b/services/scout/src/ytdlp.ts index dfbb4a1..dfb046b 100644 --- a/services/scout/src/ytdlp.ts +++ b/services/scout/src/ytdlp.ts @@ -1,36 +1,14 @@ import spawnWrapper from './spawnWrapper.ts' import 'dotenv/config' import { configs } from './config.ts' +import { type GenericApiResponse } from '@futureporn/types' +import { ExhaustedRetriesError, RoomOfflineError } from '@futureporn/utils/error.ts' const maxRetries = 3 -export class ExhaustedRetriesError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, ExhaustedRetriesError.prototype) - this.name = this.constructor.name - this.message = `ExhaustedRetries: We retried the request the maximum amount of times. maxRetries of ${maxRetries} was reached.` - } - getErrorMessage() { - return this.message - } -} - -export class RoomOfflineError extends Error { - constructor(message?: string) { - super(message) - Object.setPrototypeOf(this, RoomOfflineError.prototype) - this.name = this.constructor.name - this.message = `RoomOffline. ${this.message}` - } - getErrorMessage() { - return this.message - } -} - -export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise { +export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise { console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`) let args = ['-4', '-g', roomUrl] if (proxy) { @@ -49,7 +27,7 @@ export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = else throw new ExhaustedRetriesError() } else if (code === 0 && output.match(/https:\/\/.*\.m3u8/)) { // this must be an OK result with a playlist - return output.trim() + return { data: { playlistUrl: output.trim() }, error: null, message: 'we got a playlistUrl' } } else if (code === 1 && output.match(/Room is currently offline/)) { throw new RoomOfflineError() } else {