@futureporn/scout retries

This commit is contained in:
CJ_Clippy 2024-10-02 09:38:24 -08:00
parent 5af878c3a3
commit 4c71649c76
40 changed files with 1012 additions and 501 deletions

View File

@ -279,10 +279,10 @@ cmd_button('pgadmin4:restore',
icon_name='hub',
text='import connection',
)
cmd_button('factory:test',
argv=['./scripts/factory-test.sh'],
resource='factory',
icon_name='factory',
cmd_button('build:test',
argv=['./scripts/build-test.sh'],
resource='build',
icon_name='build',
text='test',
)
@ -314,12 +314,12 @@ docker_build(
)
docker_build(
'fp/factory',
'fp/build',
'.',
dockerfile='./dockerfiles/factory.dockerfile',
dockerfile='./dockerfiles/build.dockerfile',
target='dev',
live_update=[
sync('./services/factory', '/app/services/factory')
sync('./services/build', '/app/services/build')
],
pull=False,
)
@ -465,7 +465,7 @@ k8s_resource(
labels=['backend'],
)
k8s_resource(
workload='factory',
workload='build',
resource_deps=['postgrest'],
labels=['backend'],
)
@ -521,12 +521,6 @@ k8s_resource(
labels=['backend'],
resource_deps=['postgrest'],
)
# k8s_resource(
# workload='capture-api',
# port_forwards=['5003'],
# labels=['backend'],
# resource_deps=['postgrest', 'postgresql-primary'],
# )
k8s_resource(
workload='capture-worker',
labels=['backend'],

View File

@ -3,23 +3,26 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: factory
name: build
namespace: futureporn
labels:
app.kubernetes.io/name: factory
app.kubernetes.io/name: build
spec:
replicas: {{ .Values.factory.replicas }}
replicas: {{ .Values.build.replicas }}
selector:
matchLabels:
app: factory
app: build
template:
metadata:
labels:
app: factory
app: build
spec:
containers:
- name: factory
image: "{{ .Values.factory.imageName }}"
- name: build
image: "{{ .Values.build.imageName }}"
volumeMounts:
- name: capture-worker-cache
mountPath: "{{ .Values.capture.cache.dir }}"
env:
- name: WORKER_CONNECTION_STRING
valueFrom:
@ -35,6 +38,8 @@ spec:
value: "{{ .Values.postgrest.url }}"
- name: SCOUT_URL
value: "{{ .Values.scout.url }}"
- name: CACHE_DIR
value: "{{ .Values.capture.cache.dir }}"
- name: S3_ENDPOINT
value: "{{ .Values.s3.endpoint }}"
- name: S3_REGION
@ -59,3 +64,7 @@ spec:
memory: 1Gi
restartPolicy: Always
volumes:
- name: capture-worker-cache
persistentVolumeClaim:
claimName: capture-worker-cache-pvc

View File

@ -1,5 +1,18 @@
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: capture-worker-cache-pvc
namespace: futureporn
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: {{ .Values.capture.cache.size }}
storageClassName: {{ .Values.storageClassName }}
---
@ -20,9 +33,26 @@ spec:
labels:
app: capture-worker
spec:
# IDK if I need this initContainer.
# initContainers:
# - name: capture-worker-init
# image: busybox:latest
# command:
# - "/bin/mkdir"
# args:
# - "-p"
# - "/var/cache/taco-test"
# - "/tmp/test1"
# - "/test123"
# volumeMounts:
# - name: capture-worker-cache
# mountPath: "{{ .Values.capture.cache.dir }}"
containers:
- name: capture-worker
image: "{{ .Values.capture.imageName }}"
volumeMounts:
- name: capture-worker-cache
mountPath: "{{ .Values.capture.cache.dir }}"
env:
# - name: NODE_DEBUG
# value: "stream.onWriteComplete"
@ -30,6 +60,8 @@ spec:
value: "{{ .Values.scout.url }}"
- name: FUNCTION
value: worker
- name: WORKER_CONCURRENCY
value: "1"
- name: WORKER_CONNECTION_STRING
valueFrom:
secretKeyRef:
@ -47,8 +79,8 @@ spec:
key: httpProxy
- name: POSTGREST_URL
value: "{{ .Values.postgrest.url }}"
- name: PORT
value: "{{ .Values.capture.api.port }}"
- name: CACHE_DIR
value: "{{ .Values.capture.cache.dir }}"
- name: S3_ENDPOINT
value: "{{ .Values.s3.endpoint }}"
- name: S3_REGION
@ -70,4 +102,7 @@ spec:
cpu: 250m
memory: 1024Mi
restartPolicy: Always
volumes:
- name: capture-worker-cache
persistentVolumeClaim:
claimName: capture-worker-cache-pvc

View File

@ -24,18 +24,18 @@ capture:
imageName: fp/capture
worker:
replicas: 1
api:
port: 5003
replicas: 1
cache:
size: 20Gi
dir: /var/cache/capture-worker
mailbox:
imageName: fp/mailbox
replicas: 1
cdnBucketUrl: https://fp-dev.b-cdn.net
s3BucketName: fp-dev
port: 5000
factory:
build:
replicas: 1
imageName: fp/factory
imageName: fp/build
strapi:
replicas: 1
imageName: fp/strapi

View File

@ -1,7 +1,7 @@
## d.factory.dockerfile
## d.build.dockerfile
##
## @futureporn/factory is the system component which processes video segments into a VOD.
## Factory does tasks such as thumbnail generation, video encoding, file transfers, strapi record creation, etc.
## @futureporn/build is the system component which processes video segments into a VOD.
## Build does tasks such as thumbnail generation, video encoding, file transfers, strapi record creation, etc.
FROM node:20 AS base
@ -15,7 +15,7 @@ ENTRYPOINT ["pnpm"]
FROM base AS install
WORKDIR /app
RUN mkdir -p /app/services/factory && mkdir -p /prod/factory
RUN mkdir -p /app/services/build && mkdir -p /prod/build
## Copy manfiests, lockfiles, and configs into docker context
COPY package.json pnpm-lock.yaml .npmrc .
@ -23,7 +23,7 @@ COPY ./packages/utils/pnpm-lock.yaml ./packages/utils/package.json ./packages/ut
COPY ./packages/fetchers/package.json ./packages/fetchers/pnpm-lock.yaml ./packages/fetchers/
COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/
COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/
COPY ./services/factory/pnpm-lock.yaml ./services/factory/package.json ./services/factory/
COPY ./services/build/pnpm-lock.yaml ./services/build/package.json ./services/build/
## Install npm packages
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
@ -37,7 +37,7 @@ RUN ls -la /app/packages/utils/node_modules/prevvy/
RUn cat ./packages/utils/package.json
COPY ./packages/storage/ ./packages/storage/
COPY ./packages/types/ ./packages/types/
COPY ./services/factory/ ./services/factory/
COPY ./services/build/ ./services/build/
# we are grabbing the mp4 files from capture so we can run tests with them
COPY ./services/capture/src/fixtures ./services/capture/src/fixtures
@ -49,15 +49,15 @@ RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm -r build
## Copy all production code into one place
## `pnpm deploy` copies all dependencies into an isolated node_modules directory inside the target dir
## @see https://pnpm.io/cli/deploy
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/factory --prod /prod/factory
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm deploy --filter=@futureporn/build --prod /prod/build
FROM install AS dev
WORKDIR /app/services/factory
WORKDIR /app/services/build
RUN ls -lash
CMD ["run", "dev"]
FROM base AS factory
COPY --from=build /prod/factory .
FROM base AS prod
COPY --from=build /prod/build .
RUN ls -la .
CMD ["start"]

View File

@ -8,7 +8,7 @@ export default async function createSegment(s3_key: string, vod_id: string): Pro
s3_key,
vod_id
}
const res = await fetch(`${configs.postgrestUrl}/segments`, {
const res = await fetch(`${configs.postgrestUrl}/segments?select=*,vod:vods(*,recordings(*))`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',

View File

@ -1,21 +1,23 @@
import { configs } from './config.ts'
import { type GenericApiResponse } from '@futureporn/types'
export default async function getPlaylistUrl (url: string): Promise<string|null> {
export default async function getPlaylistUrl (url: string): Promise<GenericApiResponse> {
if (!url) throw new Error(`getPlaylistUrl requires a url, but it was undefined.`);
const res = await fetch(`${configs.scoutUrl}/ytdlp/playlist-url?url=${url}`)
if (!res.ok) {
const body = await res.text()
console.error(`failed to getPlaylistUrl res.status=${res.status}, res.statusText=${res.statusText}, body=${body}`)
return null
} else {
const data = await res.json() as any
console.log(`>>>>>> getPlaylistUrl got a data payload as follows`)
console.log(data)
if (!!data.error) {
return null;
} else {
return data.data.url
return {
error: 'PlaylistFailedError',
detail: `failed to getPlaylistUrl. res.status=${res.status}, res.statusText=${res.statusText}, body=${body}`,
data: null,
message: 'something went wrong wile fetching data from @futureporn/scout'
}
} else {
const payload = await res.json() as any
console.log(`>>>>>> getPlaylistUrl data=${payload.data}, error=${payload.error} got a data payload as follows.`)
console.log(payload)
return payload
}
}

View File

@ -11,14 +11,17 @@ import { configs } from './config.ts'
*/
export default async function updateSegmentInDatabase({
segment_id,
fileSize,
bytes,
bytes_uploaded,
}: {
segment_id: string,
fileSize: number,
bytes: number,
bytes_uploaded: number,
}): Promise<SegmentResponse> {
const payload: any = {
bytes: fileSize
bytes,
bytes_uploaded
}
const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(recording:recordings(is_aborted))`

View File

@ -1,6 +1,6 @@
import { S3Client, GetObjectCommand, type S3ClientConfig } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import type { S3FileResponse, S3FileRecord, Stream } from '@futureporn/types';
import type { S3FileResponse, S3FileRecord, StreamResponse } from '@futureporn/types';
import { createId } from '@paralleldrive/cuid2';
import { basename } from 'node:path';
import fs, { createWriteStream, createReadStream } from 'node:fs';

View File

@ -7,6 +7,15 @@ export type ProcessingState = 'processing'
export type WaitingState = 'pending_recording'
export type Status = Partial<WaitingState | ProcessingState | RecordingState>
// @see https://www.baeldung.com/rest-api-error-handling-best-practices
export interface GenericApiResponse {
error: string|null;
data: any;
message: string;
detail?: string;
help?: string;
}
export interface S3FileRecord {
s3_key: string;
s3_id?: string;
@ -226,6 +235,8 @@ export interface SegmentResponse {
s3_key: string;
s3_id: string;
bytes: number;
bytes_uploaded: number;
checksum: string;
vod?: VodResponse;
created_at: string;
updated_at: string;

View File

@ -0,0 +1,76 @@
export class ExhaustedRetriesError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, ExhaustedRetriesError.prototype)
this.name = this.constructor.name
this.message = `ExhaustedRetries: We retried the request the maximum amount of times.`
}
getErrorMessage() {
return this.message
}
}
export class RoomOfflineError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, RoomOfflineError.prototype)
this.name = this.constructor.name
this.message = `RoomOffline. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class AdminAbortedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, AdminAbortedError.prototype)
this.name = this.constructor.name
this.message = `AdminAbortedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class UploadFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, UploadFailedError.prototype)
this.name = this.constructor.name
this.message = `UploadFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class PlaylistFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, PlaylistFailedError.prototype)
this.name = this.constructor.name
this.message = `PlaylistFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class DownloadFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, DownloadFailedError.prototype)
this.name = this.constructor.name
this.message = `DownloadFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}

View File

@ -1,10 +1,11 @@
import { getTmpFile, download, getPackageVersion } from './file.ts'
import { getTmpFile, download, getPackageVersion, getFileChecksum } from './file.ts'
import { expect } from 'chai'
import { describe } from 'mocha'
import { dirname, basename, join, isAbsolute } from 'node:path';
import { fileURLToPath } from 'url';
export const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(fileURLToPath(import.meta.url));
const fixtureImage0 = join(__dirname, './fixtures/sample.webp')
describe('file', function () {
describe('integration', function () {
@ -30,5 +31,11 @@ describe('file', function () {
expect(getPackageVersion('../package.json')).to.match(/\d+\.\d+\.\d+/)
})
})
describe('getFileChecksum', function () {
it('should get a MD5sum of a file', async function () {
const checksum = await getFileChecksum(fixtureImage0)
expect(checksum).to.equal('a1fe79a39fa6e63d0faca57527792823')
})
})
})
})

View File

@ -5,6 +5,7 @@ import { Readable } from 'stream';
import { finished } from 'stream/promises';
import { dirname, basename, join, isAbsolute } from 'node:path';
import { fileURLToPath } from 'url';
import { createHash } from 'node:crypto';
export const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(fileURLToPath(import.meta.url));
const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'
@ -77,4 +78,20 @@ export async function download({
return filePath;
}
export const tmpFileRegex = /^\/tmp\/.*\.jpg$/;
export const tmpFileRegex = /^\/tmp\/.*\.jpg$/;
/**
* getFileChecksum
* greetz https://stackoverflow.com/a/44643479/1004931
*/
export async function getFileChecksum(filePath: string, algorithm = 'md5') {
return new Promise((resolve, reject) => {
const hash = createHash(algorithm);
const stream = fs.createReadStream(filePath);
stream.on('error', err => reject(err));
stream.on('data', chunk => hash.update(chunk));
stream.on('end', () => resolve(hash.digest('hex')));
});
}

View File

@ -30,7 +30,7 @@ describe('image', function () {
describe('getStoryboard', function () {
this.timeout(1000*60*15)
it('should accept a URL and return a path to image on disk', async function () {
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-25.mp4'
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-09-27.mp4'
const imagePath = await getStoryboard(url)
expect(imagePath).to.match(/\.png/)
})

View File

@ -22,4 +22,4 @@ curl -sL -H "Authorization: Bearer ${AUTOMATION_USER_JWT}" \
-H "Content-Type: application/json" \
-d '{"url": "'"${url}"'"}' \
http://localhost:9000/recordings
echo "finished creating recording"
echo "recording created"

View File

@ -1,7 +1,7 @@
{
"name": "@futureporn/factory",
"name": "@futureporn/build",
"type": "module",
"version": "1.0.0",
"version": "2.0.0",
"description": "",
"main": "src/index.ts",
"scripts": {
@ -15,7 +15,9 @@
"transcode",
"transcoding",
"process",
"processing"
"processing",
"build",
"factory"
],
"author": "@cj_clippy",
"license": "Unlicense",
@ -28,13 +30,12 @@
"@paralleldrive/cuid2": "^2.2.2",
"@types/node": "^22.5.2",
"dotenv": "^16.4.5",
"fluture": "^14.0.0",
"graphile-worker": "^0.16.6",
"ramda": "^0.30.1"
"p-retry": "^6.2.0"
},
"devDependencies": {
"@futureporn/types": "workspace:^",
"@types/ramda": "^0.30.2",
"mocha": "^10.7.3",
"nodemon": "^3.1.4",
"ts-node": "^10.9.2",
"tsx": "^4.19.0"

View File

@ -32,22 +32,19 @@ importers:
dotenv:
specifier: ^16.4.5
version: 16.4.5
fluture:
specifier: ^14.0.0
version: 14.0.0
graphile-worker:
specifier: ^0.16.6
version: 0.16.6(typescript@5.5.4)
ramda:
specifier: ^0.30.1
version: 0.30.1
p-retry:
specifier: ^6.2.0
version: 6.2.0
devDependencies:
'@futureporn/types':
specifier: workspace:^
version: link:../../packages/types
'@types/ramda':
specifier: ^0.30.2
version: 0.30.2
mocha:
specifier: ^10.7.3
version: 10.7.3
nodemon:
specifier: ^3.1.4
version: 3.1.4
@ -642,8 +639,8 @@ packages:
'@types/pg@8.11.8':
resolution: {integrity: sha512-IqpCf8/569txXN/HoP5i1LjXfKZWL76Yr2R77xgeIICUbAYHeoaEZFhYHo2uDftecLWrTJUq63JvQu8q3lnDyA==}
'@types/ramda@0.30.2':
resolution: {integrity: sha512-PyzHvjCalm2BRYjAU6nIB3TprYwMNOUY/7P/N8bSzp9W/yM2YrtGtAnnVtaCNSeOZ8DzKyFDvaqQs7LnWwwmBA==}
'@types/retry@0.12.2':
resolution: {integrity: sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==}
'@types/semver@7.5.8':
resolution: {integrity: sha512-I8EUhyrgfLrcTkzV3TSsGyl1tSuPrEDzr0yd5m90UgNxQkyDXULk3b6MlQqTCpZpNtWe1K0hzclnZkTcLBe2UQ==}
@ -657,6 +654,10 @@ packages:
engines: {node: '>=0.4.0'}
hasBin: true
ansi-colors@4.1.3:
resolution: {integrity: sha512-/6w/C21Pm1A7aZitlI5Ni/2J6FFQN8i1Cvz3kHABAAbw93v/NlvKdVOqz7CCWz/3iv/JplRSEEZ83XION15ovw==}
engines: {node: '>=6'}
ansi-regex@5.0.1:
resolution: {integrity: sha512-quJQXlTSUGL2LH9SUXo8VwsY4soanhgo6LNSm84E1LBcE8s3O0wpdiRzyR9z/ZZJMlMWv37qOOb9pdJlMUEKFQ==}
engines: {node: '>=8'}
@ -695,10 +696,16 @@ packages:
brace-expansion@1.1.11:
resolution: {integrity: sha512-iCuPHDFgrHX7H2vEI/5xpz07zSHB00TpugqhmYtVmMO6518mCuRMoOYFldEBl0g187ufozdaHgWKcYFb61qGiA==}
brace-expansion@2.0.1:
resolution: {integrity: sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==}
braces@3.0.3:
resolution: {integrity: sha512-yQbXgO/OSZVD2IsiLlro+7Hf6Q18EJrKSEsdoMzKePKXct3gvD8oLcOQdIzGupr5Fj+EDe8gO/lxc1BzfMpxvA==}
engines: {node: '>=8'}
browser-stdout@1.3.1:
resolution: {integrity: sha512-qhAVI1+Av2X7qelOfAIYwXONood6XlZE/fXaBSmW/T5SzLAmCgzi+eiWE7fUvbHaeNBQH13UftjpXxsfLkMpgw==}
buffer@5.6.0:
resolution: {integrity: sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==}
@ -706,6 +713,10 @@ packages:
resolution: {integrity: sha512-P8BjAsXvZS+VIDUI11hHCQEv74YT67YUi5JJFNWIqL235sBmjX4+qx9Muvls5ivyNENctx46xQLQ3aTuE7ssaQ==}
engines: {node: '>=6'}
camelcase@6.3.0:
resolution: {integrity: sha512-Gmy6FhYlCY7uOElZUSbxo2UCDH8owEk996gkbrpsgGtrJLM3J7jGxl9Ic7Qwwj4ivOE5AWZWRMecDdF7hqGjFA==}
engines: {node: '>=10'}
chalk@2.4.2:
resolution: {integrity: sha512-Mti+f9lpJNcwF4tWV8/OrTTtF1gZi+f8FqlyAdouralcFWFQWF2+NgCHShjkCb+IFBLq9buZwE1xckQU4peSuQ==}
engines: {node: '>=4'}
@ -718,6 +729,9 @@ packages:
resolution: {integrity: sha512-7VT13fmjotKpGipCW9JEQAusEPE+Ei8nl6/g4FBAmIm0GOOLMua9NDDo/DWp0ZAxCr3cPq5ZpBqmPAQgDda2Pw==}
engines: {node: '>= 8.10.0'}
cliui@7.0.4:
resolution: {integrity: sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==}
cliui@8.0.1:
resolution: {integrity: sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==}
engines: {node: '>=12'}
@ -759,10 +773,18 @@ packages:
supports-color:
optional: true
decamelize@4.0.0:
resolution: {integrity: sha512-9iE1PgSik9HeIIw2JO94IidnE3eBoQrFJ3w7sFuzSX4DpmZ3v5sZpUiV5Swcf6mQEF+Y0ru8Neo+p+nyh2J+hQ==}
engines: {node: '>=10'}
diff@4.0.2:
resolution: {integrity: sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==}
engines: {node: '>=0.3.1'}
diff@5.2.0:
resolution: {integrity: sha512-uIFDxqpRZGZ6ThOk84hEfqWoHx2devRFvpTZcTHur85vImfaxUbTW9Ryh4CpCuDnToOP1CEtXKIgytHBPVff5A==}
engines: {node: '>=0.3.1'}
dotenv@16.4.5:
resolution: {integrity: sha512-ZmdL2rui+eB2YwhsWzjInR8LldtZHGDoQ1ugH85ppHKwpUHL7j7rN0Ti9NCnGiQbhaZ11FpR+7ao1dNsmduNUg==}
engines: {node: '>=12'}
@ -786,6 +808,10 @@ packages:
resolution: {integrity: sha512-vbRorB5FUQWvla16U8R/qgaFIya2qGzwDrNmCZuYKrbdSUMG6I1ZCGQRefkRVhuOkIGVne7BQ35DSfo1qvJqFg==}
engines: {node: '>=0.8.0'}
escape-string-regexp@4.0.0:
resolution: {integrity: sha512-TtpcNJ3XAzx3Gq8sWRzJaVajRs0uVxA2YAkdb1jm2YkPz4G6egUFAyA3n5vtEIZefPk5Wa4UXbKuS5fKkJWdgA==}
engines: {node: '>=10'}
events@3.3.0:
resolution: {integrity: sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==}
engines: {node: '>=0.8.x'}
@ -798,9 +824,16 @@ packages:
resolution: {integrity: sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==}
engines: {node: '>=8'}
fluture@14.0.0:
resolution: {integrity: sha512-pENtLF948a8DfduVKugT8edTAbFi4rBS94xjHwzLanQqIu5PYtLGl+xqs6H8TaIRL7z/B0cDpswdINzH/HRUGA==}
engines: {node: '>=4.0.0'}
find-up@5.0.0:
resolution: {integrity: sha512-78/PXT1wlLLDgTzDs7sjq9hzz0vXD+zn+7wypEe4fXQxCmdmqfGsEPQxmiCSQI3ajFV91bVSsvNtrJRiW6nGng==}
engines: {node: '>=10'}
flat@5.0.2:
resolution: {integrity: sha512-b6suED+5/3rTpUBdG1gupIl8MPFCAMA0QXwmljLhvCUKcUvdE4gWky9zpuGCcXHOsz4J9wPGNWq6OKpmIzz3hQ==}
hasBin: true
fs.realpath@1.0.0:
resolution: {integrity: sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==}
fsevents@2.3.3:
resolution: {integrity: sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==}
@ -818,6 +851,11 @@ packages:
resolution: {integrity: sha512-AOIgSQCepiJYwP3ARnGx+5VnTu2HBYdzbGP45eLw1vr3zB3vZLeyed1sC9hnbcOc9/SrMyM5RPQrkGz4aS9Zow==}
engines: {node: '>= 6'}
glob@8.1.0:
resolution: {integrity: sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==}
engines: {node: '>=12'}
deprecated: Glob versions prior to v9 are no longer supported
graphile-config@0.0.1-beta.9:
resolution: {integrity: sha512-7vNxXZ24OAgXxDKXYi9JtgWPMuNbBL3057Yf32Ux+/rVP4+EePgySCc+NNnn0tORi8qwqVreN8bdWqGIcSwNXg==}
engines: {node: '>=16'}
@ -835,6 +873,10 @@ packages:
resolution: {integrity: sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ==}
engines: {node: '>=8'}
he@1.2.0:
resolution: {integrity: sha512-F/1DnUGPopORZi0ni+CvrCgHQ5FyEAHRLSApuYWMmrbSwoN2Mn/7k+Gl38gJnR7yyDZk6WLXwiGod1JOWNDKGw==}
hasBin: true
ieee754@1.2.1:
resolution: {integrity: sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA==}
@ -845,6 +887,10 @@ packages:
resolution: {integrity: sha512-veYYhQa+D1QBKznvhUHxb8faxlrwUnxseDAbAp457E0wLNio2bOSKnjYDhMj+YiAq61xrMGhQk9iXVk5FzgQMw==}
engines: {node: '>=6'}
inflight@1.0.6:
resolution: {integrity: sha512-k92I/b08q4wvFscXCLvqfsHCrjrF7yiXsQuIVvVE7N82W3+aqpzuUdBbfhWcy/FZR3/4IgflMgKLOsvPDrGCJA==}
deprecated: This module is not supported, and leaks memory. Do not use it. Check out lru-cache if you want a good and tested way to coalesce async requests by a key value, which is much more comprehensive and powerful.
inherits@2.0.4:
resolution: {integrity: sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==}
@ -871,10 +917,22 @@ packages:
resolution: {integrity: sha512-xelSayHH36ZgE7ZWhli7pW34hNbNl8Ojv5KVmkJD4hBdD3th8Tfk9vYasLM+mXWOZhFkgZfxhLSnrwRr4elSSg==}
engines: {node: '>=0.10.0'}
is-network-error@1.1.0:
resolution: {integrity: sha512-tUdRRAnhT+OtCZR/LxZelH/C7QtjtFrTu5tXCA8pl55eTUElUHT+GPYV8MBMBvea/j+NxQqVt3LbWMRir7Gx9g==}
engines: {node: '>=16'}
is-number@7.0.0:
resolution: {integrity: sha512-41Cifkg6e8TylSpdtTpeLVMqvSBEVzTttHvERD741+pnZ8ANv0004MRL43QKPDlK9cGvNp6NZWZUBlbGXYxxng==}
engines: {node: '>=0.12.0'}
is-plain-obj@2.1.0:
resolution: {integrity: sha512-YWnfyRwxL/+SsrWYfOpUtz5b3YD+nyfkHvjbcanzk8zgyO4ASD67uVMRt8k5bM4lLMDnXfriRhOpemw+NfT1eA==}
engines: {node: '>=8'}
is-unicode-supported@0.1.0:
resolution: {integrity: sha512-knxG2q4UC3u8stRGyAVJCOdxFmv5DZiRcdlIaAQXAbSfJya+OhopNotLQrstBhququ4ZpuKbDc/8S6mgXgPFPw==}
engines: {node: '>=10'}
js-tokens@4.0.0:
resolution: {integrity: sha512-RdJUflcE3cUzKiMqQgsCu06FPu9UdIJO0beYbPhHN4k6apgJtifcoCtT9bcxOpYBtpD2kCM6Sbzg4CausW/PKQ==}
@ -893,15 +951,35 @@ packages:
lines-and-columns@1.2.4:
resolution: {integrity: sha512-7ylylesZQ/PV29jhEDl3Ufjo6ZX7gCqJr5F7PKrqc93v7fzSymt1BpwEU8nAUXs8qzzvqhbjhK5QZg6Mt/HkBg==}
locate-path@6.0.0:
resolution: {integrity: sha512-iPZK6eYjbxRu3uB4/WZ3EsEIMJFMqAoopl3R+zuq0UjcAm/MO6KCweDgPfP3elTztoKP3KtnVHxTn2NHBSDVUw==}
engines: {node: '>=10'}
log-symbols@4.1.0:
resolution: {integrity: sha512-8XPvpAA8uyhfteu8pIvQxpJZ7SYYdpUivZpGy6sFsBuKRY/7rQGavedeB8aK+Zkyq6upMFVL/9AW6vOYzfRyLg==}
engines: {node: '>=10'}
make-error@1.3.6:
resolution: {integrity: sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==}
minimatch@3.1.2:
resolution: {integrity: sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==}
minimatch@5.1.6:
resolution: {integrity: sha512-lKwV/1brpG6mBUFHtb7NUmtABCb2WZZmm2wNiOA5hAb8VdCS4B3dtMWyvcoViccwAW/COERjXLt0zP1zXUN26g==}
engines: {node: '>=10'}
mocha@10.7.3:
resolution: {integrity: sha512-uQWxAu44wwiACGqjbPYmjo7Lg8sFrS3dQe7PP2FQI+woptP4vZXSMcfMyFL/e1yFEeEpV4RtyTpZROOKmxis+A==}
engines: {node: '>= 14.0.0'}
hasBin: true
ms@2.1.2:
resolution: {integrity: sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==}
ms@2.1.3:
resolution: {integrity: sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==}
nodemon@3.1.4:
resolution: {integrity: sha512-wjPBbFhtpJwmIeY2yP7QF+UKzPfltVGtfce1g/bB15/8vCGZj8uxD62b/b9M9/WVgme0NZudpownKN+c0plXlQ==}
engines: {node: '>=10'}
@ -914,6 +992,21 @@ packages:
obuf@1.1.2:
resolution: {integrity: sha512-PX1wu0AmAdPqOL1mWhqmlOd8kOIZQwGZw6rh7uby9fTc5lhaOWFLX3I6R1hrF9k3zUY40e6igsLGkDXK92LJNg==}
once@1.4.0:
resolution: {integrity: sha512-lNaJgI+2Q5URQBkccEKHTQOPaXdUxnZZElQTZY0MFUAuaEqe1E+Nyvgdz/aIyNi6Z9MzO5dv1H8n58/GELp3+w==}
p-limit@3.1.0:
resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==}
engines: {node: '>=10'}
p-locate@5.0.0:
resolution: {integrity: sha512-LaNjtRWUBY++zB5nE/NwcaoMylSPk+S+ZHNB1TzdbMJMny6dynpAGt7X/tl/QYq3TIeE6nxHppbo2LGymrG5Pw==}
engines: {node: '>=10'}
p-retry@6.2.0:
resolution: {integrity: sha512-JA6nkq6hKyWLLasXQXUrO4z8BUZGUt/LjlJxx8Gb2+2ntodU/SS63YZ8b0LUTbQ8ZB9iwOfhEPhg4ykKnn2KsA==}
engines: {node: '>=16.17'}
parent-module@1.0.1:
resolution: {integrity: sha512-GQ2EWRpQV8/o+Aw8YqtfZZPfNRWZYkbidE9k5rpl/hC3vtHHBfGm2Ifi6qWV+coDGkrUKZAxE3Lot5kcsRlh+g==}
engines: {node: '>=6'}
@ -922,6 +1015,10 @@ packages:
resolution: {integrity: sha512-ayCKvm/phCGxOkYRSCM82iDwct8/EonSEgCSxWxD7ve6jHggsFl4fZVQBPRNgQoKiuV/odhFrGzQXZwbifC8Rg==}
engines: {node: '>=8'}
path-exists@4.0.0:
resolution: {integrity: sha512-ak9Qy5Q7jYb2Wwcey5Fpvg2KoAc/ZIhLSLOSBmRmygPsGwkVVt0fZa0qrtMz+m6tJTAHfZQ8FnmB4MG4LWy7/w==}
engines: {node: '>=8'}
path-type@4.0.0:
resolution: {integrity: sha512-gDKb8aZMDeD/tZWs9P6+q0J9Mwkdl6xMV8TjnGP3qJVJ06bdMgkbBlLU8IdfOsIsFz2BW1rNVT3XuNEl8zPAvw==}
engines: {node: '>=8'}
@ -1013,8 +1110,8 @@ packages:
pstree.remy@1.1.8:
resolution: {integrity: sha512-77DZwxQmxKnu3aR542U+X8FypNzbfJ+C5XQDk3uWjWxn6151aIMGthWYRXTqT1E5oJvg+ljaa2OJi+VfvCOQ8w==}
ramda@0.30.1:
resolution: {integrity: sha512-tEF5I22zJnuclswcZMc8bDIrwRHRzf+NqVEmqg50ShAZMP7MWeR/RGDthfM/p+BlqvF2fXAzpn8i+SJcYD3alw==}
randombytes@2.1.0:
resolution: {integrity: sha512-vYl3iOX+4CKUWuxGi9Ukhie6fsqXqS9FE2Zaic4tNFD2N2QQaXOMFbuKK4QmDHC0JO6B1Zp41J0LpT0oR68amQ==}
readable-stream@3.6.2:
resolution: {integrity: sha512-9u/sniCrY3D5WdsERHzHE4G2YCXqoG5FTHUiCC4SIbr6XcLZBY05ya9EKjYek9O5xOAwjGq+1JdGBAS7Q9ScoA==}
@ -1035,20 +1132,21 @@ packages:
resolve-pkg-maps@1.0.0:
resolution: {integrity: sha512-seS2Tj26TBVOC2NIc2rOe2y2ZO7efxITtLZcGSOnHHNOQ7CkiUBfw0Iw2ck6xkIhPwLhKNLS8BO+hEpngQlqzw==}
retry@0.13.1:
resolution: {integrity: sha512-XQBQ3I8W1Cge0Seh+6gjj03LbmRFWuoszgK9ooCpwYIrhhoO80pfq4cUkU5DkknwfOfFteRwlZ56PYOGYyFWdg==}
engines: {node: '>= 4'}
safe-buffer@5.2.1:
resolution: {integrity: sha512-rp3So07KcdmmKbGvgaNxQSJr7bGVSVk5S9Eq1F+ppbRo70+YeaDxkw5Dd8NPN+GD6bjnYm2VuPuCXmpuYvmCXQ==}
sanctuary-show@2.0.0:
resolution: {integrity: sha512-REj4ZiioUXnDLj6EpJ9HcYDIEGaEexmB9Fg5o6InZR9f0x5PfnnC21QeU9SZ9E7G8zXSZPNjy8VRUK4safbesw==}
sanctuary-type-identifiers@3.0.0:
resolution: {integrity: sha512-YFXYcG0Ura1dSPd/1xLYtE2XAWUEsBHhMTZvYBOvwT8MeFQwdUOCMm2DC+r94z6H93FVq0qxDac8/D7QpJj6Mg==}
semver@7.6.3:
resolution: {integrity: sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A==}
engines: {node: '>=10'}
hasBin: true
serialize-javascript@6.0.2:
resolution: {integrity: sha512-Saa1xPByTTq2gdeFZYLLo+RFE35NHZkAbqZeWNd3BpzppeVisAqpDjcp8dyf6uIvEqJRd46jemmyA4iFIeVk8g==}
simple-update-notifier@2.0.0:
resolution: {integrity: sha512-a2B9Y0KlNXl9u/vsW6sTIu9vGEpfKu2wRV6l1H3XEas/0gUIzGzBoP/IouTcUQbm9JWZLH3COxyn03TYlFax6w==}
engines: {node: '>=10'}
@ -1071,6 +1169,10 @@ packages:
resolution: {integrity: sha512-Y38VPSHcqkFrCpFnQ9vuSXmquuv5oXOKpGeT6aGrr3o3Gc9AlVa6JBfUSOCnbxGGZF+/0ooI7KrPuUSztUdU5A==}
engines: {node: '>=8'}
strip-json-comments@3.1.1:
resolution: {integrity: sha512-6fPc+R4ihwqP6N/aIv2f1gMH8lOVtWQHoqC4yK6oSDVVocumAsfCqjkXnqiYMhmMwS/mEHLp7Vehlt3ql6lEig==}
engines: {node: '>=8'}
strnum@1.0.5:
resolution: {integrity: sha512-J8bbNyKKXl5qYcR36TIO8W3mVGVHrmmxsd5PAItGkmyzwJvybiw2IVq5nqd0i4LSNSkB/sx9VHllbfFdr9k1JA==}
@ -1082,6 +1184,10 @@ packages:
resolution: {integrity: sha512-qpCAvRl9stuOHveKsn7HncJRvv501qIacKzQlO/+Lwxc9+0q2wLyv4Dfvt80/DPn2pqOBsJdDiogXGR9+OvwRw==}
engines: {node: '>=8'}
supports-color@8.1.1:
resolution: {integrity: sha512-MpUEN2OodtUzxvKQl72cUF7RQ5EiHsGvSsVG0ia9c5RbWGL2CI4C7EpPS8UTBIplnlzZiNuV56w+FuNxy3ty2Q==}
engines: {node: '>=10'}
to-regex-range@5.0.1:
resolution: {integrity: sha512-65P7iz6X5yEr1cwcgvQxbbIw7Uk3gOy5dIdtZ4rDveLqhrdJP+Li/Hx6tyK0NEb+2GCyneCMJiGqrADCSNk8sQ==}
engines: {node: '>=8.0'}
@ -1104,9 +1210,6 @@ packages:
'@swc/wasm':
optional: true
ts-toolbelt@9.6.0:
resolution: {integrity: sha512-nsZd8ZeNUzukXPlJmTBwUAuABDe/9qtVDelJeT/qW0ow3ZS3BsQJtNkan1802aM9Uf68/Y8ljw86Hu0h5IUW3w==}
tslib@2.7.0:
resolution: {integrity: sha512-gLXCKdN1/j47AiHiOkJN69hJmcbGTHI0ImLmbYLHykhgeN0jVGola9yVjFgzCUklsZQMW55o+dW7IXv3RCXDzA==}
@ -1115,9 +1218,6 @@ packages:
engines: {node: '>=18.0.0'}
hasBin: true
types-ramda@0.30.1:
resolution: {integrity: sha512-1HTsf5/QVRmLzcGfldPFvkVsAdi1db1BBKzi7iW3KBUlOICg/nKnFS+jGqDJS3YD8VsWbAh7JiHeBvbsw8RPxA==}
typescript@5.5.4:
resolution: {integrity: sha512-Mtq29sKDAEYP7aljRgtPOpTvOfbwRWlS6dPRzwjdE+C0R4brX/GUyhHSecbHMFLNBLcJIPt9nl9yG5TZ1weH+Q==}
engines: {node: '>=14.17'}
@ -1139,10 +1239,16 @@ packages:
v8-compile-cache-lib@3.0.1:
resolution: {integrity: sha512-wa7YjyUGfNZngI/vtK0UHAN+lgDCxBPCylVXGp0zu59Fz5aiGtNXaq3DhIov063MorB+VfufLh3JlF2KdTK3xg==}
workerpool@6.5.1:
resolution: {integrity: sha512-Fs4dNYcsdpYSAfVxhnl1L5zTksjvOJxtC5hzMNl+1t9B8hTJTdKDyZ5ju7ztgPy+ft9tBFXoOlDNiOT9WUXZlA==}
wrap-ansi@7.0.0:
resolution: {integrity: sha512-YVGIj2kamLSTxw6NsZjoBxfSwsn0ycdesmc4p+Q21c5zPuZ1pl+NfxVdxPtdHvmNVOQ6XSYG4AUtyt/Fi7D16Q==}
engines: {node: '>=10'}
wrappy@1.0.2:
resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==}
xtend@4.0.2:
resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==}
engines: {node: '>=0.4'}
@ -1151,10 +1257,22 @@ packages:
resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==}
engines: {node: '>=10'}
yargs-parser@20.2.9:
resolution: {integrity: sha512-y11nGElTIV+CT3Zv9t7VKl+Q3hTQoT9a1Qzezhhl6Rp21gJ/IVTW7Z3y9EWXhuUBC2Shnf+DX0antecpAwSP8w==}
engines: {node: '>=10'}
yargs-parser@21.1.1:
resolution: {integrity: sha512-tVpsJW7DdjecAiFpbIB1e3qxIQsE6NoPc5/eTdrbbIC4h0LVsWhnoa3g+m2HclBIujHzsxZ4VJVA+GUuc2/LBw==}
engines: {node: '>=12'}
yargs-unparser@2.0.0:
resolution: {integrity: sha512-7pRTIA9Qc1caZ0bZ6RYRGbHJthJWuakf+WmHK0rVeLkNrrGhfoabBNdue6kdINI6r4if7ocq9aD/n7xwKOdzOA==}
engines: {node: '>=10'}
yargs@16.2.0:
resolution: {integrity: sha512-D1mvvtDG0L5ft/jGWkLpG1+m0eQxOfaBvTNELraWj22wSVUMWxZUvYgJYcKh6jGGIkJFhH4IZPQhR4TKpc8mBw==}
engines: {node: '>=10'}
yargs@17.7.2:
resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==}
engines: {node: '>=12'}
@ -1163,6 +1281,10 @@ packages:
resolution: {integrity: sha512-Ux4ygGWsu2c7isFWe8Yu1YluJmqVhxqK2cLXNQA5AcC3QfbGNpM7fu0Y8b/z16pXLnFxZYvWhd3fhBY9DLmC6Q==}
engines: {node: '>=6'}
yocto-queue@0.1.0:
resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==}
engines: {node: '>=10'}
snapshots:
'@aws-crypto/crc32@5.2.0':
@ -2138,9 +2260,7 @@ snapshots:
pg-protocol: 1.6.1
pg-types: 4.0.2
'@types/ramda@0.30.2':
dependencies:
types-ramda: 0.30.1
'@types/retry@0.12.2': {}
'@types/semver@7.5.8': {}
@ -2150,6 +2270,8 @@ snapshots:
acorn@8.12.1: {}
ansi-colors@4.1.3: {}
ansi-regex@5.0.1: {}
ansi-styles@3.2.1:
@ -2182,10 +2304,16 @@ snapshots:
balanced-match: 1.0.2
concat-map: 0.0.1
brace-expansion@2.0.1:
dependencies:
balanced-match: 1.0.2
braces@3.0.3:
dependencies:
fill-range: 7.1.1
browser-stdout@1.3.1: {}
buffer@5.6.0:
dependencies:
base64-js: 1.5.1
@ -2193,6 +2321,8 @@ snapshots:
callsites@3.1.0: {}
camelcase@6.3.0: {}
chalk@2.4.2:
dependencies:
ansi-styles: 3.2.1
@ -2216,6 +2346,12 @@ snapshots:
optionalDependencies:
fsevents: 2.3.3
cliui@7.0.4:
dependencies:
string-width: 4.2.3
strip-ansi: 6.0.1
wrap-ansi: 7.0.0
cliui@8.0.1:
dependencies:
string-width: 4.2.3
@ -2247,14 +2383,28 @@ snapshots:
create-require@1.1.1: {}
debug@4.3.6:
dependencies:
ms: 2.1.2
debug@4.3.6(supports-color@5.5.0):
dependencies:
ms: 2.1.2
optionalDependencies:
supports-color: 5.5.0
debug@4.3.6(supports-color@8.1.1):
dependencies:
ms: 2.1.2
optionalDependencies:
supports-color: 8.1.1
decamelize@4.0.0: {}
diff@4.0.2: {}
diff@5.2.0: {}
dotenv@16.4.5: {}
emoji-regex@8.0.0: {}
@ -2294,6 +2444,8 @@ snapshots:
escape-string-regexp@1.0.5: {}
escape-string-regexp@4.0.0: {}
events@3.3.0: {}
fast-xml-parser@4.4.1:
@ -2304,10 +2456,14 @@ snapshots:
dependencies:
to-regex-range: 5.0.1
fluture@14.0.0:
find-up@5.0.0:
dependencies:
sanctuary-show: 2.0.0
sanctuary-type-identifiers: 3.0.0
locate-path: 6.0.0
path-exists: 4.0.0
flat@5.0.2: {}
fs.realpath@1.0.0: {}
fsevents@2.3.3:
optional: true
@ -2322,13 +2478,21 @@ snapshots:
dependencies:
is-glob: 4.0.3
glob@8.1.0:
dependencies:
fs.realpath: 1.0.0
inflight: 1.0.6
inherits: 2.0.4
minimatch: 5.1.6
once: 1.4.0
graphile-config@0.0.1-beta.9:
dependencies:
'@types/interpret': 1.1.3
'@types/node': 20.16.3
'@types/semver': 7.5.8
chalk: 4.1.2
debug: 4.3.6(supports-color@5.5.0)
debug: 4.3.6
interpret: 3.1.1
semver: 7.6.3
tslib: 2.7.0
@ -2356,6 +2520,8 @@ snapshots:
has-flag@4.0.0: {}
he@1.2.0: {}
ieee754@1.2.1: {}
ignore-by-default@1.0.1: {}
@ -2365,6 +2531,11 @@ snapshots:
parent-module: 1.0.1
resolve-from: 4.0.0
inflight@1.0.6:
dependencies:
once: 1.4.0
wrappy: 1.0.2
inherits@2.0.4: {}
interpret@3.1.1: {}
@ -2383,8 +2554,14 @@ snapshots:
dependencies:
is-extglob: 2.1.1
is-network-error@1.1.0: {}
is-number@7.0.0: {}
is-plain-obj@2.1.0: {}
is-unicode-supported@0.1.0: {}
js-tokens@4.0.0: {}
js-yaml@4.1.0:
@ -2397,14 +2574,52 @@ snapshots:
lines-and-columns@1.2.4: {}
locate-path@6.0.0:
dependencies:
p-locate: 5.0.0
log-symbols@4.1.0:
dependencies:
chalk: 4.1.2
is-unicode-supported: 0.1.0
make-error@1.3.6: {}
minimatch@3.1.2:
dependencies:
brace-expansion: 1.1.11
minimatch@5.1.6:
dependencies:
brace-expansion: 2.0.1
mocha@10.7.3:
dependencies:
ansi-colors: 4.1.3
browser-stdout: 1.3.1
chokidar: 3.6.0
debug: 4.3.6(supports-color@8.1.1)
diff: 5.2.0
escape-string-regexp: 4.0.0
find-up: 5.0.0
glob: 8.1.0
he: 1.2.0
js-yaml: 4.1.0
log-symbols: 4.1.0
minimatch: 5.1.6
ms: 2.1.3
serialize-javascript: 6.0.2
strip-json-comments: 3.1.1
supports-color: 8.1.1
workerpool: 6.5.1
yargs: 16.2.0
yargs-parser: 20.2.9
yargs-unparser: 2.0.0
ms@2.1.2: {}
ms@2.1.3: {}
nodemon@3.1.4:
dependencies:
chokidar: 3.6.0
@ -2422,6 +2637,24 @@ snapshots:
obuf@1.1.2: {}
once@1.4.0:
dependencies:
wrappy: 1.0.2
p-limit@3.1.0:
dependencies:
yocto-queue: 0.1.0
p-locate@5.0.0:
dependencies:
p-limit: 3.1.0
p-retry@6.2.0:
dependencies:
'@types/retry': 0.12.2
is-network-error: 1.1.0
retry: 0.13.1
parent-module@1.0.1:
dependencies:
callsites: 3.1.0
@ -2433,6 +2666,8 @@ snapshots:
json-parse-even-better-errors: 2.3.1
lines-and-columns: 1.2.4
path-exists@4.0.0: {}
path-type@4.0.0: {}
pg-cloudflare@1.1.1:
@ -2510,7 +2745,9 @@ snapshots:
pstree.remy@1.1.8: {}
ramda@0.30.1: {}
randombytes@2.1.0:
dependencies:
safe-buffer: 5.2.1
readable-stream@3.6.2:
dependencies:
@ -2528,14 +2765,16 @@ snapshots:
resolve-pkg-maps@1.0.0: {}
retry@0.13.1: {}
safe-buffer@5.2.1: {}
sanctuary-show@2.0.0: {}
sanctuary-type-identifiers@3.0.0: {}
semver@7.6.3: {}
serialize-javascript@6.0.2:
dependencies:
randombytes: 2.1.0
simple-update-notifier@2.0.0:
dependencies:
semver: 7.6.3
@ -2561,6 +2800,8 @@ snapshots:
dependencies:
ansi-regex: 5.0.1
strip-json-comments@3.1.1: {}
strnum@1.0.5: {}
supports-color@5.5.0:
@ -2571,6 +2812,10 @@ snapshots:
dependencies:
has-flag: 4.0.0
supports-color@8.1.1:
dependencies:
has-flag: 4.0.0
to-regex-range@5.0.1:
dependencies:
is-number: 7.0.0
@ -2595,8 +2840,6 @@ snapshots:
v8-compile-cache-lib: 3.0.1
yn: 3.1.1
ts-toolbelt@9.6.0: {}
tslib@2.7.0: {}
tsx@4.19.0:
@ -2606,10 +2849,6 @@ snapshots:
optionalDependencies:
fsevents: 2.3.3
types-ramda@0.30.1:
dependencies:
ts-toolbelt: 9.6.0
typescript@5.5.4: {}
undefsafe@2.0.5: {}
@ -2622,18 +2861,41 @@ snapshots:
v8-compile-cache-lib@3.0.1: {}
workerpool@6.5.1: {}
wrap-ansi@7.0.0:
dependencies:
ansi-styles: 4.3.0
string-width: 4.2.3
strip-ansi: 6.0.1
wrappy@1.0.2: {}
xtend@4.0.2: {}
y18n@5.0.8: {}
yargs-parser@20.2.9: {}
yargs-parser@21.1.1: {}
yargs-unparser@2.0.0:
dependencies:
camelcase: 6.3.0
decamelize: 4.0.0
flat: 5.0.2
is-plain-obj: 2.1.0
yargs@16.2.0:
dependencies:
cliui: 7.0.4
escalade: 3.2.0
get-caller-file: 2.0.5
require-directory: 2.1.1
string-width: 4.2.3
y18n: 5.0.8
yargs-parser: 20.2.9
yargs@17.7.2:
dependencies:
cliui: 8.0.1
@ -2645,3 +2907,5 @@ snapshots:
yargs-parser: 21.1.1
yn@3.1.1: {}
yocto-queue@0.1.0: {}

View File

@ -14,6 +14,7 @@ if (!process.env.S3_REGION) throw new Error('Missing S3_REGION env var');
if (!process.env.S3_ENDPOINT) throw new Error('Missing S3_REGION env var');
if (!process.env.S3_MAIN_BUCKET) throw new Error('Missing S3_BUCKET env var');
if (!process.env.S3_USC_BUCKET) throw new Error('Missing S3_USC_BUCKET env var');
if (!process.env.CACHE_DIR) throw new Error('Missing CACHE_DIR env var');
const postgrestUrl = process.env.POSTGREST_URL!
const automationUserJwt = process.env.AUTOMATION_USER_JWT!
const connectionString = process.env.WORKER_CONNECTION_STRING!
@ -23,6 +24,7 @@ const s3Endpoint = process.env.S3_ENDPOINT!
const s3SecretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
const s3MainBucket = process.env.S3_MAIN_BUCKET!
const s3UscBucket = process.env.S3_USC_BUCKET!
const cacheDir = process.env.CACHE_DIR!
export interface Config {
postgrestUrl: string;
@ -34,6 +36,7 @@ export interface Config {
s3Endpoint: string;
s3UscBucket: string;
s3MainBucket: string;
cacheDir: string;
}
@ -47,4 +50,5 @@ export const configs: Config = {
s3Region,
s3MainBucket,
s3UscBucket,
cacheDir,
}

View File

@ -4,19 +4,20 @@ import { basename, join } from 'node:path';
import { S3Client } from "@aws-sdk/client-s3"
import { Upload } from "@aws-sdk/lib-storage"
import { createId } from '@paralleldrive/cuid2';
// import { downloadS3File, uploadToS3, createStrapiB2File, createStrapiVod, createStrapiStream } from '@futureporn/storage'
// import { concatenateVideoSegments } from '@futureporn/video'
import { execFile } from 'node:child_process';
import { configs } from '../config';
import { pipeline, PassThrough, Readable } from 'node:stream';
import { createReadStream, createWriteStream, write } from 'node:fs';
import { writeFile, readFile } from 'node:fs/promises';
import { PassThrough, Readable } from 'node:stream';
import { createReadStream } from 'node:fs';
import { stat, writeFile } from 'node:fs/promises';
import { pipeline } from 'node:stream/promises';
import { tmpdir } from 'node:os';
import { promisify } from 'node:util';
import patchVodInDatabase from '@futureporn/fetchers/patchVodInDatabase.ts'
import { downloadFile } from '@futureporn/storage/s3.ts';
import { S3FileRecord, VodRecord } from '@futureporn/types';
const pipelinePromise = promisify(pipeline)
import { S3FileRecord, SegmentResponse } from '@futureporn/types';
import getVod from '@futureporn/fetchers/getVod.ts';
import { type S3ClientConfig } from "@aws-sdk/client-s3";
import pRetry, {AbortError} from 'p-retry'
import { getFileChecksum, getTmpFile } from '@futureporn/utils/file.ts';
interface s3ManifestEntry {
key: string;
@ -24,8 +25,8 @@ interface s3ManifestEntry {
}
interface Payload {
s3_manifest: s3ManifestEntry[];
vod_id?: string;
s3_manifest?: S3FileRecord[];
}
interface S3Target {
@ -45,10 +46,16 @@ interface S3UploadParameters {
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.s3_manifest !== "object") throw new Error("invalid s3_manifest");
}
async function withRetry<T>(fn: () => Promise<T>, retries = 3): Promise<T> {
return pRetry(fn, {
onFailedAttempt: (e: Error) => {
console.error(`Error during attempt:`, e);
},
retries
});
}
/**
*
@ -143,17 +150,78 @@ const getS3ParallelUpload = async function ({
}
/**
* Checks if a file exists at the specified path.
*/
async function fileExists(path: string): Promise<boolean> {
try {
await stat(path);
return true;
} catch {
return false;
}
}
/**
* Validates checksum if an expected checksum is provided.
*/
async function validateChecksumIfNeeded(filePath: string, segment: SegmentResponse) {
const expectedChecksum = segment?.checksum;
if (expectedChecksum) {
const actualChecksum = await getFileChecksum(filePath, 'md5');
if (expectedChecksum !== actualChecksum) {
throw new Error(`Downloaded segment ${segment.id} but the expected checksum ${expectedChecksum} did not match actual ${actualChecksum}.`);
}
}
}
export const combine_video_segments: Task = async function (payload: unknown, helpers: Helpers) {
// helpers.logger.info('the following is the raw Task payload')
// helpers.logger.info(payload)
// helpers.logger.info(JSON.stringify(payload?.s3_manifest))
assertPayload(payload)
const { s3_manifest, vod_id } = payload
if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.');
helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`)
/**
* downloadSegment
*
* - [x] If the file is already in local cache, that file is used.
* - [x] Validates checksum to ensure file is whole
*/
async function downloadSegment(client: S3Client, segment: SegmentResponse) {
if (!segment) throw new Error('segment passed to downloadSegment was missing');
if (!segment.s3_key) throw new Error('segment passed to downloadSegment was missing s3_key');
const cachePath = join(configs.cacheDir, segment.s3_key);
const isFileInCache = await fileExists(cachePath);
// Check and return cache if available
if (isFileInCache) {
await validateChecksumIfNeeded(cachePath, segment);
return cachePath;
}
// Download segment and validate checksum
const tmpFilePath = await withRetry(() => downloadFile(client, configs.s3UscBucket, segment.s3_key), 3);
await validateChecksumIfNeeded(tmpFilePath, segment);
return tmpFilePath;
}
/**
* downloadSegments
*
* Download a list of segments from S3.
*/
async function downloadSegments(client: S3Client, segments: SegmentResponse[]) {
for (const segment of segments) {
const segmentFilePath = await downloadSegment(client, segment)
}
}
/**
* doIntegratedCombine
*
* Integrated combine_video_segments is when the requester is giving us only a vod_id
* It's our job to inspect the vod and get it's segments and combine them
* Then upload the result to S3 and update the vod's s3_file
*
*/
async function doIntegratedCombine(helpers: Helpers, vod_id: string) {
/**
* Here we take a manifest of S3 files and we download each of them.
* Then we combine them all, preserving original order using `ffmpeg -f concat`
@ -167,25 +235,44 @@ export const combine_video_segments: Task = async function (payload: unknown, he
* we edit the stream record, adding a relation to the vod we just created.
*/
try {
const vod = await getVod(vod_id)
if (!vod) throw new Error('doIntegratedCombine failed to get vod');
const client = new S3Client({
// * [x] Get a list of segments associated with this vod.
// * [ ] Download the segments (or pull from disk cache)
// * [ ] VALIDATE segment checksums(?)
// * [ ] Combine segments using ffmpeg
// * [ ] Upload resulting video
// * [ ] Create s3_file with resulting video information
// * [ ] Update s3_file vod record to reference s3_file
const options: S3ClientConfig = {
endpoint: configs.s3Endpoint,
region: configs.s3Region,
credentials: {
accessKeyId: configs.s3AccessKeyId,
secretAccessKey: configs.s3SecretAccessKey
}
});
}
const client = new S3Client(options);
if (!vod.segments) throw new Error('vod.segments was missing');
const segments = vod.segments
const localSegments = await downloadSegments(client, segments)
const s3Manifest = s3_manifest
const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key)))
const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m: S3FileRecord) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key)))
const concatenatedVideoFile = await concatVideos(inputVideoFilePaths)
const s3KeyName = basename(concatenatedVideoFile)
const inputStream = createReadStream(concatenatedVideoFile)
const filePath = concatenatedVideoFile
const { uploadStream, upload } = await getS3ParallelUpload({ client, s3KeyName, filePath })
pipelinePromise(inputStream, uploadStream)
pipeline(inputStream, uploadStream)
await upload.done()
if (!vod_id) throw new Error('vod_id was missing from payload');
@ -198,7 +285,7 @@ export const combine_video_segments: Task = async function (payload: unknown, he
} catch (e: any) {
helpers.logger.error('combined_video_segments failed')
helpers.logger.error('combine_video_segments failed')
if (e instanceof Error) {
helpers.logger.error(e.message)
} else {
@ -207,6 +294,42 @@ export const combine_video_segments: Task = async function (payload: unknown, he
throw e
}
}
/**
* doSoloRequest
*
* Solo combine_video_segments is when the requester is giving us a s3_manifest
* Which is just a S3FileRecord[]
*
* It's our job to download those files and combine them, then upload the result to S3
*/
async function doSoloCombine(helpers: Helpers, s3_manifest: S3FileRecord[]) {
const s3Manifest = s3_manifest
const inputVideoFilePaths = await Promise.all(s3Manifest.filter((m) => (m.bytes !== 0)).map((m) => downloadFile(client, configs.s3UscBucket, m.key)))
}
export const combine_video_segments: Task = async function (payload: unknown, helpers: Helpers) {
// helpers.logger.info('the following is the raw Task payload')
// helpers.logger.info(payload)
// helpers.logger.info(JSON.stringify(payload?.s3_manifest))
assertPayload(payload)
const { s3_manifest, vod_id } = payload
if (!vod_id) throw new Error('combine_video_segments was called without a vod_id.');
helpers.logger.info(`🏗️ combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`)
const isSoloRequest = (!!vod_id && !s3_manifest)
const isIntegratedRequest = (!!s3_manifest?.length && !vod_id)
if (isSoloRequest && !isIntegratedRequest) {
await doSoloCombine(helpers, s3_manifest!)
} else if (isIntegratedRequest && !isSoloRequest) {
await doIntegratedCombine(helpers, vod_id)
} else {
throw new Error(`Ambiguous request. Use either s3_manifest or vod_id argument, not both.`);
}
}

View File

@ -6,25 +6,21 @@ import patchVodInDatabase from "@futureporn/fetchers/patchVodInDatabase.ts";
import { configs } from "../config";
interface Payload {
vod_id: string;
vod_id?: string;
video_url?: string;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload (it must be an object)");
if (typeof payload.vod_id !== "string") throw new Error("payload.vod_id was not a string");
if (typeof payload.vod_id !== "string" && typeof payload.video_url !== "string") throw new Error("payload requires either vod_id or video_url, however both were not a string.");
}
async function doIntegratedRequest(vodId: string): Promise<void> {
export const generate_thumbnail: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { vod_id } = payload
if (!vod_id) throw new Error('generate_thumbnail Task was started without a vod_id. This is unsupported.');
helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}`);
const vod = await getVod(vod_id)
const vod = await getVod(vodId)
const s3_file = vod?.s3_file
if (!s3_file) throw new Error(`vod ${vod_id} was missing a s3_file.`);
if (!s3_file) throw new Error(`vod ${vodId} was missing a s3_file.`);
// we need to get a CDN url to the vod so we can download chunks of the file in order for Prevvy to create the storyboard image.
const cdnUrl = getCdnUrl(configs.s3MainBucket, s3_file.s3_key)
@ -41,15 +37,48 @@ export const generate_thumbnail: Task = async function (payload: unknown, helper
s3Endpoint: configs.s3Region,
s3Region: configs.s3Region
}
const upload = await uploadFile(uploadArgs)
if (!upload) throw new Error(`failed to upload ${tmpImagePath} to S3`);
if (!upload.Key) throw new Error(`failed to upload ${tmpImagePath} to S3 (upload.Key was missing)`);
// we need to create a S3 file in the db
const thumbnail = getCdnUrl(configs.s3MainBucket, upload.Key)
await patchVodInDatabase(vod_id, { thumbnail })
await patchVodInDatabase(vodId, { thumbnail })
}
async function doSoloRequest(videoUrl: string): Promise<string> {
return await getThumbnailUrl(videoUrl)
}
export const generate_thumbnail: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { vod_id, video_url } = payload
helpers.logger.info(`🏗️ generate_thumbnail started with vod_id=${vod_id}, video_url=${video_url}`);
// Determine what kind of request is being made.
// It could be one of two scenarios
// * Here is a VOD record in the database, please update it's thumbnail. (integratedRequest)
// * Here is a video URL, please give us a thumbnail URL. (soloRequest)
//
const integratedRequest = (!!vod_id && !video_url)
const soloRequest = (!!video_url && !vod_id)
if (integratedRequest) {
await doIntegratedRequest(vod_id)
} else if (soloRequest) {
await getThumbnailUrl(video_url)
} else {
throw new Error(`unsupported ambiguous request!`)
}
}
export default generate_thumbnail

View File

@ -199,34 +199,6 @@ importers:
specifier: ^5.5.4
version: 5.5.4
../..: {}
../../packages/fetchers: {}
../../packages/infra: {}
../../packages/storage: {}
../../packages/types: {}
../../packages/utils: {}
../bot: {}
../factory: {}
../mailbox: {}
../migrations: {}
../next: {}
../scout: {}
../strapi: {}
../uppy: {}
packages:
'@aws-crypto/crc32@5.2.0':

View File

@ -3,83 +3,39 @@
*/
import { VodResponse } from "@futureporn/types"
import getVod from '@futureporn/fetchers/getVod.ts'
import { PassThrough, Readable, type Writable } from "stream"
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { ua0 } from '@futureporn/utils/name.ts'
import { spawn } from 'child_process'
import { ChildProcessByStdio, spawn } from 'child_process'
import { pipeline } from "stream/promises"
import { configs } from "./config"
import { nanoid } from 'nanoid'
import { Upload, type Progress } from "@aws-sdk/lib-storage"
import { S3Client, type S3ClientConfig } from '@aws-sdk/client-s3'
import { S3Client } from '@aws-sdk/client-s3'
import prettyBytes from "pretty-bytes"
import updateSegmentInDatabase from "@futureporn/fetchers/updateSegmentInDatabase.ts"
import { getRecordingRelatedToVod } from "@futureporn/fetchers/getRecording.ts"
import createSegment from "@futureporn/fetchers/createSegment.ts"
import createSegmentsVodLink from "@futureporn/fetchers/createSegmentsVodLink.ts"
import { createReadStream, createWriteStream } from "fs"
import pRetry, {AbortError} from 'p-retry'
import pRetry from 'p-retry'
import { type SegmentResponse } from '@futureporn/types'
import getPlaylistUrl from "@futureporn/fetchers/getPlaylistUrl.ts"
import { isBefore, sub } from 'date-fns'
import { isBefore, isAfter, sub } from 'date-fns'
import { setTimeout } from 'node:timers/promises';
import {
RoomOfflineError,
PlaylistFailedError,
AdminAbortedError,
ExhaustedRetriesError,
} from '@futureporn/utils/error.ts'
export interface RecordNextGenerationArguments {
vodId: string;
url: string;
}
export class AdminAbortedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, AdminAbortedError.prototype)
this.name = this.constructor.name
this.message = `AdminAbortedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class UploadFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, UploadFailedError.prototype)
this.name = this.constructor.name
this.message = `UploadFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class PlaylistFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, PlaylistFailedError.prototype)
this.name = this.constructor.name
this.message = `PlaylistFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export class DownloadFailedError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, DownloadFailedError.prototype)
this.name = this.constructor.name
this.message = `DownloadFailedError. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
/**
@ -89,21 +45,51 @@ export class DownloadFailedError extends Error {
*
* ## Issues/TODO list
*
* @todo [x] onProgress stops firing
* @todo [x] OOMKilled seen via development environment
* @done [x] onProgress stops firing
* @done [x] OOMKilled seen via development environment
* @todo [ ] undefined behavior during CB private shows
* @todo [ ] does not handle CB Hidden Camera ticket shows
* @todo [ ] Upload segments in a way that does not interrupt downloading new segments.
* @done [x] Upload segments in a way that does not interrupt downloading new segments.
* There is an issue where a segment download ends, and the segment upload immediately begins.
* At first glance this looks like good behavior, but what is happening during the segment upload is that the livestream
* is continuing, but we aren't recording it anymore. We are using Backblaze, thus uploads are slow.
* We miss a lot of the stream because the upload takes many minutes.
* Instead of this behavior of immediately uploading after a segment download completes, we should upload once the livestream is finished,
* OR we should upload while concurrently downloading the next segment.
* @todo [ ] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context.
* @done [x] Move retrying from the {Task} `record` context to the class `RecordNextGeneration` context.
* There is an issue where the `record` task needs to retry after a temporary failure, but it cannot because there aren't any available workers.
* The solution is to not exit the `record` task at all, and instead keep the `record` task running, but suspended while a exponential backoff timer elapses.
* This way, the worker stays focused on the recording and retries until the stream has been offline for n minutes, at which point `record` is complete.
* @done [x] Abort process results in corrupted .ts files that ffmpeg/vlc/kdenlive cannot read. Granted, aborted vods are those which are NOT desirable to save, so maybe we ignore this issue?
* @done [x] RecordNextGeneration gives up immediately in response to RoomOffline. It must retry until 5 minutes have elapsed.
* @done [x] .bytes and .bytes_uploaded do not match at the end of the upload. .bytes_uploaded is curiously larger than .bytes!
* @todo [ ] Two jobs can occur concurrently (technically env var WORKER_CONCURRENCY allows for >1 so this isn't a bug)
* I set it to 1. Now we just need to test that only 1 job can happen at a time.
*
* ```
* updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4571791360 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca
updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4592762880 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca
updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4613734400 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca
updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4624220160 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca
updateSegmentBytes() Segment 0 -- segment.id=2bf39450-2911-4f26-aca6-41cf092fd5e6 .bytes=6017952380 .bytes_uploaded=4645191680 .s3_key=MufqGsVF2hY6sAtN5rmrX.ts .vod.id=cbc80caf-73e5-41e5-8bbb-2d03ca234aca
during startProgressReports(), we encountered the following error.
TypeError: fetch failed
at node:internal/deps/undici/undici:13178:13
at process.processTicksAndRejections (node:internal/process/task_queues:95:5)
at async updateSegmentInDatabase (/app/packages/fetchers/src/updateSegmentInDatabase.ts:29:15)
at async RecordNextGeneration.updateSegmentBytes (/app/services/capture/src/RecordNextGeneration.ts:342:21)
at async RecordNextGeneration.updateDatabaseRecords (/app/services/capture/src/RecordNextGeneration.ts:326:5)
at async Timeout._onTimeout (/app/services/capture/src/RecordNextGeneration.ts:352:9) {
[cause]: Error: getaddrinfo EAI_AGAIN postgrest.futureporn.svc.cluster.local
at GetAddrInfoReqWrap.onlookupall [as oncomplete] (node:dns:120:26) {
errno: -3001,
code: 'EAI_AGAIN',
syscall: 'getaddrinfo',
hostname: 'postgrest.futureporn.svc.cluster.local'
}
}
```
*
*
*/
export default class RecordNextGeneration {
@ -116,6 +102,7 @@ export default class RecordNextGeneration {
public tmpDiskPath?: string;
private vod?: VodResponse | null;
private downloadStream?: Readable;
private downloadProcess?: ChildProcessByStdio<Writable, Readable, null>;
private uploadStream?: PassThrough;
private uploadInstance?: Upload;
private diskStream?: Writable;
@ -139,6 +126,7 @@ export default class RecordNextGeneration {
this.abortController.signal.addEventListener("abort", this.abortEventListener.bind(this))
this.retries = 0
this.segments = []
}
async withRetry(fn: any, retries = 3) {
@ -153,61 +141,67 @@ export default class RecordNextGeneration {
abortEventListener() {
console.log(`abortEventListener has been invoked. this.abortSignal is as follows`)
console.log(this.abortController.signal)
console.log(JSON.stringify(this.abortController.signal, null, 2))
const reason = this.abortController.signal.reason
if (this.downloadStream) {
console.log(`aborted the stream download with reason=${reason}`)
this.downloadStream.destroy(new AdminAbortedError())
} else {
console.warn(`downloadStream does not exist. Perhaps it has already been aborted?`)
}
}
getMultipartUpload({
client,
bucket,
key,
body,
}: {
client: S3Client,
bucket: string,
key: string,
body: Readable,
}) {
const params = {
Bucket: bucket,
Key: key,
Body: body
}
const upload = new Upload({
client,
partSize: 1024 * 1024 * 5,
queueSize: 1,
// @see https://github.com/aws/aws-sdk-js-v3/issues/2311
// tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
// It should instead be interpreted as, 'throwOnPartsError'
leavePartsOnError: true,
params
})
/**
* aws client docs recommend against using async onProgress handlers.
* therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere.
*/
const onProgress = (progress: Progress) => {
if (progress?.loaded) {
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
this.reportMemoryUsage()
this.uploadCounter = progress.loaded
// console.log(JSON.stringify(this.abortController.signal, null, 2))
// const reason = this.abortController.signal.reason
if (this.downloadProcess) {
// console.log(`aborted the stream process with reason=${reason}`)
// we want to send SIGINT to ffmpeg rather than forcefully destroying the stream.
// this prevents the downloaded .ts file from being corrupted.
// this.downloadStream.destroy(new AdminAbortedError())
this.downloadProcess.kill('SIGINT');
if (this.downloadStream) {
this.downloadStream.emit('error', new AdminAbortedError());
}
} else {
console.warn(`downloadProcess does not exist. Perhaps it has already been aborted?`)
}
upload.on("httpUploadProgress", onProgress);
return upload
}
// getMultipartUpload({
// client,
// bucket,
// key,
// body,
// }: {
// client: S3Client,
// bucket: string,
// key: string,
// body: Readable,
// }) {
// const params = {
// Bucket: bucket,
// Key: key,
// Body: body
// }
// const upload = new Upload({
// client,
// partSize: 1024 * 1024 * 5,
// queueSize: 1,
// // @see https://github.com/aws/aws-sdk-js-v3/issues/2311
// // tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
// // It should instead be interpreted as, 'throwOnPartsError'
// leavePartsOnError: true,
// params
// })
// /**
// * aws client docs recommend against using async onProgress handlers.
// * therefore, I'm only setting this.uploadCounter inside the syncronous handler and we call async updateSegmentInDatabase() elsewhere.
// */
// const onProgress = (progress: Progress) => {
// if (progress?.loaded) {
// console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
// this.reportMemoryUsage()
// this.uploadCounter = progress.loaded
// }
// }
// upload.on("httpUploadProgress", onProgress);
// return upload
// }
// @todo there is a problem that results in COMPLETE LOSS OF SEGMENT DATA.
// when the download stream closes before the upload stream, I think the upload stream gets cut off.
// this means the upload isn't allowed to save and that means no data whatsoever gets put to S3.
@ -227,11 +221,12 @@ export default class RecordNextGeneration {
static getDiskStream(s3Key?: string) {
const tmpDiskPath = join(tmpdir(), s3Key || `${nanoid()}.ts`)
const tmpDiskPath = join(configs.cacheDir, s3Key || `${nanoid()}.ts`)
console.log(`getDiskStream() tmpDiskPath=${tmpDiskPath}`)
return createWriteStream(tmpDiskPath, { encoding: 'utf-8' })
}
static getFFmpegStream({ playlistUrl }: { playlistUrl: string }): Readable {
static getFFmpeg({ playlistUrl }: { playlistUrl: string }): ChildProcessByStdio<Writable, Readable, null> {
console.log(`getFFmpegStream using playlistUrl=${playlistUrl}`)
const ffmpegProc = spawn('ffmpeg', [
'-headers', `"User-Agent: ${ua0}"`,
@ -247,14 +242,19 @@ export default class RecordNextGeneration {
// ignoring stderr is important because if not, ffmpeg will fill that buffer and node will hang
stdio: ['pipe', 'pipe', 'ignore']
})
return ffmpegProc.stdout
return ffmpegProc
}
onUploadProgress(progress: Progress) {
if (progress?.loaded) {
console.log(`Upload progress! ${progress.loaded} bytes loaded (${prettyBytes(progress.loaded)}).`)
onUploadProgress(segment: SegmentResponse, progress: Progress) {
// console.log('onUploadProgress() running now. progress as follows')
// console.log(progress)
// find the matching segment and update it's bytes_uploaded
const matchingSegment = this.segments.find((s) => s.id === segment.id)
if (progress?.loaded && matchingSegment) {
matchingSegment.bytes_uploaded = progress.loaded
this.reportMemoryUsage()
this.uploadCounter = progress.loaded
}
}
@ -273,25 +273,9 @@ export default class RecordNextGeneration {
if (mem.rss > 256000000) console.warn(`High memory usage! ${JSON.stringify(this.formatMemoryStats(mem))}`);
}
// handleClosedStream() {
// if (this.downloadStream?.destroyed && this.uploadStream?.destroyed) {
// console.log(`both downloadStream and uploadStream are destroyed which suggests success.`)
// console.log(`we need to cleanly exit`)
// }
// else if (this.downloadStream?.destroyed && !this.uploadStream?.destroyed) {
// console.log(`downloadStream was destroyed but uploadStream was not, which suggests a download failure.`)
// console.log(`we need to do nothing and wait for the uploadStream to finish.`)
// }
// else if (!this.downloadStream?.destroyed && this.uploadStream?.destroyed) {
// console.log(`uploadStream was destroyed but downloadStream was not, which suggests an upload failure.`)
// console.log(`we need to throw immediately so record() can retry and start a new segment.`)
// throw new UploadFailedError()
// }
// }
getS3Client() {
const clientOptions: S3ClientConfig = {
const clientOptions: any = {
endpoint: configs.s3Endpoint,
region: configs.s3Region,
credentials: {
@ -311,52 +295,34 @@ export default class RecordNextGeneration {
getNames() {
const s3Key = `${nanoid()}.ts`
const tmpDiskPath = join(tmpdir(), s3Key)
const tmpDiskPath = join(configs.cacheDir, s3Key)
console.log(`tmpDiskPath=${tmpDiskPath}`)
return { tmpDiskPath, s3Key }
}
/**
* getDatabaseRecords
*
* Get records from the database
* * vod
* * segment
* * segment_vod_link
*/
// async getDatabaseRecords() {
// this.vod = await getVod(this.vodId)
// if (!this.vod) throw new Error(`RecordNextGeneration failed to fetch vod ${this.vodId}`);
// if (this.vod.recording.is_aborted) throw new AdminAbortedError();
// if (!this.s3Key) throw new Error('getDatabaseRecords() requires this.s3Key, but it was undefined.');
// const segmentId = await createSegment(this.s3Key, this.vodId)
// const segmentVodLinkId = await createSegmentsVodLink(this.vodId, this.segmentId)
// if (!this.vod) throw new Error('after getDatabaseRecords() ran, this.vod was missing.');
// if (!segmentId) throw new Error('after getDatabaseRecords() ran, segmentId was missing.');
// if (!segmentVodLinkId) throw new Error('after getDatabaseRecords() ran, segmentVodLinkId was missing.');
// return { segmentId, segmentVodLinkId }
// }
static async _dl(url: string, s3Key: string) {
const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new PlaylistFailedError();
const ffmpegStream = RecordNextGeneration.getFFmpegStream({ playlistUrl })
const { data, error } = (await getPlaylistUrl(url))
if (!data) throw new PlaylistFailedError();
if (error) {
if (error === 'PlaylistFailedError') throw new PlaylistFailedError();
if (error === 'RoomOfflineError') throw new RoomOfflineError();
}
console.log(`_dl playlistUrl=${data.playlistUrl}`)
const ffmpegProcess = RecordNextGeneration.getFFmpeg({ playlistUrl: data.playlistUrl })
const ffmpegStream = ffmpegProcess.stdout
const diskStream = RecordNextGeneration.getDiskStream(s3Key)
const streamPipeline = pipeline(ffmpegStream, diskStream)
return {
return {
pipeline: streamPipeline,
ffmpegStream,
ffmpegProcess,
diskStream,
}
}
static async _ul(client: S3Client, diskPath: string, key: string) {
const diskStream = createReadStream(diskPath, { encoding: 'utf-8' })
const diskStream = createReadStream(diskPath)
const params = {
Bucket: configs.s3UscBucket,
@ -365,7 +331,7 @@ export default class RecordNextGeneration {
}
const uploadInstance = new Upload({
client,
partSize: 1024 * 1024 * 5,
partSize: 1024 * 1024 * 10,
queueSize: 1,
// @see https://github.com/aws/aws-sdk-js-v3/issues/2311
// tl;dr: the variable name, 'leavePartsOnError' is not representative of the behavior.
@ -380,62 +346,6 @@ export default class RecordNextGeneration {
}
}
async upload() {
const s3Client = this.s3Client
const s3Key = this.s3Key
const tmpDiskPath = this.tmpDiskPath
if (!s3Client) throw new Error('s3Client was missing during upload()');
if (!s3Key) throw new Error('s3Key was missing during upload()');
if (!tmpDiskPath) throw new Error('tmpDiskPath was missing during upload()');
const fileStream = createReadStream(tmpDiskPath, { encoding: 'utf-8' })
this.uploadInstance = this.getMultipartUpload({ client: s3Client, bucket: configs.s3UscBucket, key: s3Key, body: fileStream })
await this.uploadInstance.done()
}
/**
* # handleExceptions
*
* We want to handle any exceptions that are thrown, so our process continues running.
* Ideally we know every failure scenario and we handle it graceefully.
* If we ever reach the default: case below, it's a bug and we need to patch it.
*/
handleExceptions (e: any, phase?: string) {
console.info(`handleExceptions is called during phase=${phase} with e.name=${e.name} e instanceof Error?=${e instanceof Error} e.message=${e.message}`)
if (e instanceof Error && e.name === 'RoomOfflineError') {
// if the room is offline, we re-throw the RoomOfflineError so the recording gets retried
// we do this because the offline might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
// @todo try again immediately
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
// An admin aborted the recording which means we don't want to retry recording.
// we return <void> which causes the 'record' Task to be marked as successful.
console.log(`clear as day, that is an AdminAbortedError! ❤️`)
return
} else if (e instanceof Error && e.name === 'DownloadFailedError') {
throw e
} else if (e instanceof Error && e.message === 'no tomes available') {
console.error(`Received a 'no tomes available' error from S3 which ususally means they're temporarily overloaded.`)
throw e
} else if (e instanceof Error && e.name === 'UploadFailedError') {
throw e
} else {
console.error(`!!!!!!!!!!!!!! 🚩🚩🚩 handleExceptions did not find a known scenario which should probably never happen. Please patch the code to handle this scenario.`)
console.error((e instanceof Error) ? `(e instanceof Error)=${(e instanceof Error)}, e.message='${e.message}', e.name='${e.name}'` : JSON.stringify(e))
}
}
async updateDatabaseRecords() {
await this.updateSegmentBytes()
@ -453,8 +363,10 @@ export default class RecordNextGeneration {
async updateSegmentBytes() {
for (const [index, segment] of this.segments.entries()) {
if (segment.id) {
console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} segments.bytes=${segment.bytes}`)
await updateSegmentInDatabase({ segment_id: segment.id, fileSize: segment.bytes })
console.log(`updateSegmentBytes() Segment ${index} -- segment.id=${segment.id} .bytes=${segment.bytes} .bytes_uploaded=${segment.bytes_uploaded} .s3_key=${segment.s3_key} .vod.id=${segment?.vod?.id}`)
const seg = await updateSegmentInDatabase({ segment_id: segment.id, bytes: segment.bytes, bytes_uploaded: segment.bytes_uploaded })
let matchingSegment = this.segments.find((s) => s.id === seg.id)
if (matchingSegment) matchingSegment = seg // update the locally cached segment so update_at is accurate (used during isTryingDownload())
}
}
}
@ -480,14 +392,17 @@ export default class RecordNextGeneration {
* There are always more tries unless the stream has been offline for greater than 5 minutes.
*/
isTryingDownload() {
const isSegmentPresent = (this.segments && this.segments?.length > 0)
const isSegmentPresent = (!!this.segments && this.segments?.length > 0)
// console.log(`isTryingDownload() this.segments=${this.segments}, this.segments.length=${this.segments?.length}, isSegmentPresent=${isSegmentPresent} `);
if (!isSegmentPresent) return true;
const latestSegment = this.segments.at(-1)
const hasUpdatedTimestamp = latestSegment?.updated_at
if (!hasUpdatedTimestamp) throw new Error('latestSegment does not have an updated_at property');
const fiveMinsAgo = sub(new Date(), { minutes: 5 })
const lastUpdatedAt = latestSegment.updated_at
return (isBefore(lastUpdatedAt, fiveMinsAgo)) ? true : false;
console.log(`isTryingDownload fiveMinsAgo=${fiveMinsAgo.toISOString()}, lastUpdatedAt=${lastUpdatedAt}`)
return isAfter(lastUpdatedAt, fiveMinsAgo)
}
@ -505,8 +420,14 @@ export default class RecordNextGeneration {
async done() {
this.startProgressReports();
try {
console.log(`>> downloadSegments() begin.`)
await this.downloadSegments();
console.log(`>> downloadsSegments() finished. ${this.segments.length} downloaded.`)
console.log(`waiting 30 seconds for downloadSegments() to close ffmpeg ...`) // @todo potential speed optimization is to proceed as soon as file is done being written to
await setTimeout(30*1000)
console.log(`>> uploadSegments() begin.`)
await this.uploadSegments();
console.log(`>> uploadSegments() finished.`)
} catch (e) {
console.error(`An error was encountered during done() function. This should not happen under nominal scenarios. This may be a bug; please investigate.`)
throw e
@ -528,11 +449,13 @@ export default class RecordNextGeneration {
if (!segment.id) {
throw new Error(`Failed to createSegment(). segment.id was missing.`);
}
console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vod:vods(*,recordings(*))`)
this.segments.push(segment)
const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
const { pipeline, ffmpegStream, ffmpegProcess } = (await RecordNextGeneration._dl(this.url, s3_key))
if (this.downloadStream) throw new Error(`If you see this error, there is a bug in your code. downloadSegment() tried to use this.downloadStream but it was already being used by some other part of the code. Please refactor so this.downloadStream is not used by more than one function at any given time.`);
this.downloadStream = ffmpegStream
this.downloadProcess = ffmpegProcess
console.log(`setting downloadProcess as follows. ${ffmpegProcess}`)
ffmpegStream.on('data', (data) => {
let mySegment = this.segments.find((s) => s.id === segment.id)
if (mySegment) {
@ -554,13 +477,28 @@ export default class RecordNextGeneration {
*/
async downloadSegments(): Promise<void> {
try {
await this.downloadSegment()
while (this.isTryingDownload()) {
// Backoff timer, in case of followup segments.
// if most recent segment was created greater than 1 minute ago, there is no timer. (immediate retry)
// else the retry timer is 30 seconds (wait before retrying)
const mostRecentSegmentCreationTimestamp = this.segments.at(-1)?.created_at
if (mostRecentSegmentCreationTimestamp) {
const thirtySecondsAgo = sub(new Date(), { seconds: 30 })
const mostRecentSegmentCreation = new Date(mostRecentSegmentCreationTimestamp)
const isMostRecentSegmentCreatedGreaterThanOneMinuteAgo = isBefore(mostRecentSegmentCreation, thirtySecondsAgo)
if (isMostRecentSegmentCreatedGreaterThanOneMinuteAgo) console.log(`Waiting 30 seconds before next downloadSegment().`);
await setTimeout((isMostRecentSegmentCreatedGreaterThanOneMinuteAgo) ? 1000*30 : 0)
}
await this.downloadSegment()
}
} catch (e) {
if (e instanceof Error && e.name === 'RoomOfflineError') {
// If the room is offline, then we want to retry immediately.
// We do this because the offline room might be a temporary situation.
// e.g. streamer's computer bluescreened and they're coming back after they reboot.
// If the room has been offline for >5 minutes, then we consider the stream concluded and we return.
console.warn('Room is offline! ~~ lets try again, if appropriate.')
console.log(`isTryingDownload()=${this.isTryingDownload()}`)
if (this.isTryingDownload()) {
return this.downloadSegments()
} else {
@ -568,7 +506,7 @@ export default class RecordNextGeneration {
}
} else if (e instanceof Error && e.name === 'PlaylistFailedError') {
// sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.
console.error(`sometimes @futureporn/scout fails to get the playlist URL. We want to immediately try again.`)
return this.downloadSegments()
} else if (e instanceof Error && e.name === 'AdminAbortedError') {
@ -594,7 +532,7 @@ export default class RecordNextGeneration {
*/
async uploadSegments() {
try {
for (const segment of this.segments) {
for (const segment of this.segments.filter((s) => s.bytes > 0)) {
await this.uploadSegment(segment)
}
} catch (e) {
@ -605,79 +543,28 @@ export default class RecordNextGeneration {
}
async uploadSegment(segment: SegmentResponse) {
const diskPath = join(tmpdir(), segment.s3_key)
const diskPath = join(configs.cacheDir, segment.s3_key)
const key = segment.s3_key
const client = this.getS3Client()
await pRetry(async (attemptCount: number) => {
console.log(`uploadSegment() attempt ${attemptCount}`)
if (!this.s3Client) throw new Error('S3Client')
const { uploadInstance } = (await RecordNextGeneration._ul(client, diskPath, key))
uploadInstance.on('httpUploadProgress', () => this.onUploadProgress)
uploadInstance.on('httpUploadProgress', (progress: Progress) => this.onUploadProgress(segment, progress))
return uploadInstance.done()
}, {
onFailedAttempt: (e) => {
console.error(`failed to uploadSegment() with the following error. Retrying.`)
console.error(`failed to uploadSegment() with the following error. Retrying with ${e.retriesLeft} retries left.`)
console.error(e)
},
retries: 9
})
const matchingSegment = this.segments.find((s) => s.id === segment.id)
if (matchingSegment?.bytes_uploaded && matchingSegment?.bytes) {
matchingSegment.bytes_uploaded = matchingSegment.bytes
}
}
}
// async done_old() {
// /**
// * Errors thrown inside the setTimeout callback will end up as an uncaughtException.
// * We handle those errors here to prevent node from exiting.
// */
// process.on('uncaughtException', (e) => {
// this.stopProgressReports()
// console.log(`!!! 🚩 WE HAVE CAUGHT AN UNCAUGHT EXCEPTION. (This should never occur. This is probably a bug that needs to be fixed.) error as follows.`)
// console.log(e)
// process.exit(69)
// })
// try {
// const client = this.getS3Client()
// console.log(`>> 1. Segment downloading phase`)
// while (this.isTryingDownload()) {
// if (!this.databaseUpdateTimer) this.startProgressReports();
// const s3_key = `${nanoid()}.ts`
// const segment = await createSegment(s3_key, this.vodId)
// if (!segment.id) {
// console.log('the following is segment')
// console.log(segment)
// throw new Error(`received invalid segment from db fetch()`);
// }
// this.segments.push(segment)
// console.log(`New segment created. @see http://localhost:9000/segments?id=eq.${segment.id}&select=*,vods(*,recordings(*))`)
// const { pipeline, ffmpegStream } = (await RecordNextGeneration._dl(this.url, s3_key))
// this.downloadStream = ffmpegStream
// ffmpegStream.on('data', (data) => {
// let mSegment = this.segments.find((s) => s.id === segment.id)
// if (mSegment) {
// mSegment.bytes += data.length;
// }
// })
// await pipeline
// }
// console.log(`>> 2. Segment uploading phase`)
// } catch (e) {
// this.stopProgressReports()
// return this.handleExceptions(e, '_dl()|_ul()')
// }
// }
// }

View File

@ -9,6 +9,8 @@ const requiredEnvVars = [
'S3_USC_BUCKET',
'POSTGREST_URL',
'AUTOMATION_USER_JWT',
'CACHE_DIR',
'WORKER_CONCURRENCY',
] as const;
const getEnvVar = (key: typeof requiredEnvVars[number]): string => {
@ -28,6 +30,8 @@ export interface Config {
s3Region: string;
s3UscBucket: string;
s3Endpoint: string;
cacheDir: string;
workerConcurrency: number;
}
@ -40,4 +44,6 @@ export const configs: Config = {
s3Region: getEnvVar('S3_REGION'),
s3UscBucket: getEnvVar('S3_USC_BUCKET'),
s3Endpoint: getEnvVar('S3_ENDPOINT'),
cacheDir: getEnvVar('CACHE_DIR'),
workerConcurrency: parseInt(getEnvVar('WORKER_CONCURRENCY')),
}

View File

@ -3,7 +3,7 @@
import { build } from './app.ts'
import 'dotenv/config'
import { configs } from './config.ts'
import { makeWorkerUtils, type WorkerUtils, Runner, RunnerOptions, run as graphileRun } from 'graphile-worker'
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'url';
@ -17,11 +17,10 @@ const version = getPackageVersion(join(__dirname, '../package.json'))
if (!process.env.FUNCTION) throw new Error(`FUNCTION env var was missing. FUNCTION env var must be either 'api' or 'worker'.`);
if (!process.env.WORKER_CONNECTION_STRING) throw new Error(`WORKER_CONNECTION_STRING env var was missing`);
const connectionString = process.env.WORKER_CONNECTION_STRING!
const concurrency = (process.env?.WORKER_CONCURRENCY) ? parseInt(process.env.WORKER_CONCURRENCY) : 1
const preset: GraphileConfig.Preset = {
worker: {
connectionString: process.env.WORKER_CONNECTION_STRING,
concurrentJobs: concurrency,
concurrentJobs: configs.workerConcurrency,
fileExtensions: [".js", ".ts"],
},
};
@ -59,7 +58,7 @@ async function doRunWorker(workerUtils: WorkerUtils) {
const runnerOptions: RunnerOptions = {
preset,
concurrency,
concurrency: configs.workerConcurrency,
taskDirectory: join(__dirname, 'tasks'),
}

View File

@ -0,0 +1,33 @@
-- builds table schema
CREATE TABLE api.builds (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vod UUID NOT NULL REFERENCES api.vods(id),
task TEXT NOT NULL,
created_at timestamp(6) without time zone,
updated_at timestamp(6) without time zone
);
-- roles & permissions for our backend automation user
GRANT all ON api.builds TO automation;
GRANT SELECT ON api.builds TO web_anon;
-- trigger function for starting the appropriate task when a new api.builds row is added
CREATE FUNCTION public.tg__add_build_job() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
AS $$
begin
PERFORM graphile_worker.add_job(NEW.task, json_build_object(
'vod', NEW.vod
), max_attempts := 6);
return NEW;
end;
$$;
CREATE TRIGGER create_build
AFTER UPDATE ON api.builds
FOR EACH ROW
EXECUTE FUNCTION tg__add_build_job();

View File

@ -0,0 +1,14 @@
ALTER TABLE api.builds
ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE api.builds
ALTER COLUMN updated_at SET DEFAULT now();
CREATE TRIGGER update_builds
BEFORE UPDATE ON api.builds
FOR EACH ROW
EXECUTE PROCEDURE moddatetime(updated_at);

View File

@ -0,0 +1,7 @@
-- fixing a mistake. I accidentally did AFTER UPDATE when I wanted AFTER INSERT.
DROP TRIGGER create_build ON api.builds;
CREATE TRIGGER create_build
AFTER INSERT ON api.builds
FOR EACH ROW
EXECUTE FUNCTION tg__add_build_job();

View File

@ -0,0 +1,2 @@
ALTER TABLE api.segments
ADD COLUMN checksum TEXT;

View File

@ -0,0 +1,8 @@
-- for consistency, using vod_id instead of vod
ALTER TABLE api.builds
DROP COLUMN vod;
ALTER TABLE api.builds
ADD COLUMN vod_id UUID NOT NULL REFERENCES api.vods(id);

View File

@ -0,0 +1,22 @@
-- we've renamed api.builds.vod to api.builds.vod_id
DROP FUNCTION public.tg__add_build_job CASCADE;
CREATE FUNCTION public.tg__add_build_job() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
AS $$
begin
PERFORM graphile_worker.add_job(NEW.task, json_build_object(
'vod_id', NEW.vod_id
), max_attempts := 6);
return NEW;
end;
$$;
CREATE TRIGGER create_build
AFTER UPDATE ON api.builds
FOR EACH ROW
EXECUTE FUNCTION tg__add_build_job();

View File

@ -0,0 +1,2 @@
ALTER TABLE api.segments
ADD COLUMN bytes_uploaded BIGINT DEFAULT 0;

View File

@ -12,7 +12,7 @@ import scrapeVtuberData from './scrapeVtuberData.ts'
import { getPlaylistUrl } from './ytdlp.ts'
import { getRandomRoom } from './cb.ts'
import { getPackageVersion } from '@futureporn/utils/file.ts'
import { type GenericApiResponse } from '@futureporn/types'
type VtuberDataRequest = FastifyRequest<{
Querystring: { url: string }
@ -90,7 +90,7 @@ fastify.get('/ytdlp/playlist-url', {
querystring: {
type: 'object',
properties: {
url: {
playlistUrl: {
type: 'string'
}
}
@ -100,7 +100,7 @@ fastify.get('/ytdlp/playlist-url', {
error: { type: 'boolean' },
message: { type: 'string' },
data: { type: 'object', properties: {
url: { type: 'string' }
playlistUrl: { type: 'string' }
}}
}
},
@ -108,9 +108,15 @@ fastify.get('/ytdlp/playlist-url', {
}
}, async (req: VtuberDataRequest, reply) => {
try {
const playlistUrl = await getPlaylistUrl(req.query.url)
console.log(`playlistUrl=${playlistUrl}`)
reply.type('application/json').send(JSON.stringify({ data: { url: playlistUrl } }))
const { data, error, message } = await getPlaylistUrl(req.query.url)
console.log(`playlistUrl=${data.playlistUrl}`)
reply.type('application/json').send(JSON.stringify({
data: {
playlistUrl: data.playlistUrl
},
error: error,
message: message
}))
} catch (e) {
reply.type('application/json').send(JSON.stringify({ data: null, error: e }))
}

View File

@ -1,36 +1,14 @@
import spawnWrapper from './spawnWrapper.ts'
import 'dotenv/config'
import { configs } from './config.ts'
import { type GenericApiResponse } from '@futureporn/types'
import { ExhaustedRetriesError, RoomOfflineError } from '@futureporn/utils/error.ts'
const maxRetries = 3
export class ExhaustedRetriesError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, ExhaustedRetriesError.prototype)
this.name = this.constructor.name
this.message = `ExhaustedRetries: We retried the request the maximum amount of times. maxRetries of ${maxRetries} was reached.`
}
getErrorMessage() {
return this.message
}
}
export class RoomOfflineError extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, RoomOfflineError.prototype)
this.name = this.constructor.name
this.message = `RoomOffline. ${this.message}`
}
getErrorMessage() {
return this.message
}
}
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<GenericApiResponse> {
console.log(`getPlaylistUrl roomUrl=${roomUrl} proxy=${false} retries=${retries}`)
let args = ['-4', '-g', roomUrl]
if (proxy) {
@ -49,7 +27,7 @@ export async function getPlaylistUrl (roomUrl: string, proxy = false, retries =
else throw new ExhaustedRetriesError()
} else if (code === 0 && output.match(/https:\/\/.*\.m3u8/)) {
// this must be an OK result with a playlist
return output.trim()
return { data: { playlistUrl: output.trim() }, error: null, message: 'we got a playlistUrl' }
} else if (code === 1 && output.match(/Room is currently offline/)) {
throw new RoomOfflineError()
} else {