differentiate services vs packages
ci / build (push) Has been cancelled Details

This commit is contained in:
CJ_Clippy 2024-07-27 16:42:09 -08:00
parent 4bc11c027e
commit 287321bb1e
384 changed files with 9070 additions and 599 deletions

View File

@ -27,8 +27,8 @@ velero:
./scripts/velero-create.sh
tilt:
kind get kubeconfig > ~/.kube/kind.yaml
KUBECONFIG=~/.kube/kind.yaml tilt up -f ./Tiltfile
kind get kubeconfig > ~/.kube/futureporn.yaml
KUBECONFIG=~/.kube/futureporn.yaml tilt up -f ./Tiltfile
exoscale:
kubectl apply -f https://raw.githubusercontent.com/exoscale/cert-manager-webhook-exoscale/master/deploy/exoscale-webhook-kustomize/deploy.yaml

View File

@ -12,7 +12,7 @@ secret_settings(
## cert-manager slows down Tilt updates so I prefer to keep it commented unless I specifically need to test certs
## cert-manager loaded using this extension is PAINFULLY SLOW, and it must re-install and re-test every time the Tiltfile changes.
## additionally, it is SYNCRHONOUS, which means nothing else can update until cert-manager is updated. @see https://github.com/tilt-dev/tilt-extensions/pull/90#issuecomment-704381205
## TL;DR: It's much preferred & much faster to use a helm chart for working with cert-manager in every environment.
## TL;DR: This is convenient, but it's much faster to use a helm chart for working with cert-manager.
# load('ext://cert_manager', 'deploy_cert_manager')
# deploy_cert_manager(
# load_to_kind=True,
@ -190,12 +190,7 @@ cmd_button('capture-api:create',
icon_name='send',
text='Start Recording'
)
cmd_button('postgres:graphile',
argv=['sh', './scripts/postgres-test-graphile.sh'],
resource='postgresql-primary',
icon_name='graph',
text='create graphile test job',
)
cmd_button('postgres:graphile',
argv=['sh', './scripts/postgres-test-graphile.sh'],
resource='postgresql-primary',
@ -294,10 +289,11 @@ docker_build(
'./packages/scout',
'./packages/types',
'./packages/utils',
'./services/capture',
],
live_update=[
sync('./packages/capture', '/app'),
],
sync('./packages/capture/dist', '/app/dist'),
]
)

View File

@ -21,17 +21,17 @@ WORKDIR /app
FROM base AS build
## Copy the manifests and lockfiles into the build context
COPY package.json pnpm-lock.yaml pnpm-workspace.yaml .npmrc .
COPY ./packages/capture/package.json ./packages/capture/pnpm-lock.yaml ./packages/capture/
COPY ./services/capture/package.json ./services/capture/pnpm-lock.yaml ./services/capture/
COPY ./packages/scout/package.json ./packages/scout/pnpm-lock.yaml ./packages/scout/
COPY ./packages/types/package.json ./packages/types/pnpm-lock.yaml ./packages/types/
COPY ./packages/utils/package.json ./packages/utils/pnpm-lock.yaml ./packages/utils/
## install npm packages
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --prefer-offline
## Copy in all other project files
COPY ./packages/capture/ ./packages/capture/
## Copy in all project files
COPY ./services/capture/ ./services/capture/
COPY ./packages/scout/ ./packages/scout/
COPY ./packages/types/ ./packages/types/
COPY ./packages/utils/ ./packages/utils/
@ -40,13 +40,12 @@ COPY ./packages/utils/ ./packages/utils/
RUN pnpm run -r build
RUN mkdir -p /prod/capture
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm --filter=@futureporn/capture deploy --prod /prod/capture
RUN ls -la /prod/capture
## start the app with dumb init to spawn the Node.js runtime process
## with signal support
## The mode @futureporn/capture uses when starting is determined by FUNCTION environment variable. (worker|api)
FROM base AS capture
RUN ls -la /usr/local/bin/yt-dlp
ENV HOSTNAME=0.0.0.0 NODE_ENV=production
COPY --from=build /prod/capture .
CMD [ "dumb-init", "node", "dist/index.js" ]

View File

@ -5,15 +5,20 @@
"tilt@latest",
"ctlptl@latest",
"kubectl@latest",
"cmctl@latest"
"cmctl@latest",
"kubernetes-helm@latest",
"k9s@latest",
"ffmpeg@latest",
"yt-dlp@latest"
],
"env": {
"DEVBOX_COREPACK_ENABLED": "true",
"ENV": "development"
"ENV": "development",
"KUBECONFIG": "~/.kube/futureporn.yaml"
},
"shell": {
"init_hook": [
"pnpm install"
"echo Welcome to Futureporn devbox"
],
"scripts": {
"test": [

View File

@ -97,6 +97,202 @@
}
}
},
"ffmpeg@latest": {
"last_modified": "2024-07-24T00:53:51Z",
"resolved": "github:NixOS/nixpkgs/4f02464258baaf54992debfd010a7a3662a25536#ffmpeg",
"source": "devbox-search",
"version": "6.1.1",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/7jl394717pnlj5jy8n46jq65sw1gwb20-ffmpeg-6.1.1-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/ma1ssbkbwrasdgsyp0y3x6jbc72pp9s7-ffmpeg-6.1.1-man",
"default": true
},
{
"name": "out",
"path": "/nix/store/smq7vi0562incbgwf4cbx10i0y46jsbc-ffmpeg-6.1.1"
},
{
"name": "data",
"path": "/nix/store/289ikv8ld8whmixs3r4qd4r878mkjmm9-ffmpeg-6.1.1-data"
},
{
"name": "dev",
"path": "/nix/store/r8y6va82y6libjw065gkn5gr51715gac-ffmpeg-6.1.1-dev"
},
{
"name": "doc",
"path": "/nix/store/yasff9ggma6myg47sm805idfxnz0zkac-ffmpeg-6.1.1-doc"
},
{
"name": "lib",
"path": "/nix/store/pmv9jg541b2valk47vh7q40m1p8xr7ik-ffmpeg-6.1.1-lib"
}
],
"store_path": "/nix/store/7jl394717pnlj5jy8n46jq65sw1gwb20-ffmpeg-6.1.1-bin"
},
"aarch64-linux": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/6ydyry316fcc59iy80zpnxnsrh9f18ki-ffmpeg-6.1.1-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/k1frp52lx3ycwbdgliwcrmb81zm4n10n-ffmpeg-6.1.1-man",
"default": true
},
{
"name": "dev",
"path": "/nix/store/rdh4mv7fnmj79a4dp9rfqnd711y9inpv-ffmpeg-6.1.1-dev"
},
{
"name": "doc",
"path": "/nix/store/vam8a1591x7bkqjljvwsral2v72xwa77-ffmpeg-6.1.1-doc"
},
{
"name": "lib",
"path": "/nix/store/k2mdb9lh6qjb63pizcc0lv7kzakgvcm1-ffmpeg-6.1.1-lib"
},
{
"name": "out",
"path": "/nix/store/6z17gry0dn1yxr3r36fk87sxnddcjg66-ffmpeg-6.1.1"
},
{
"name": "data",
"path": "/nix/store/qxyrrzdl4svxs3dfszsmi2mky4vrzvfa-ffmpeg-6.1.1-data"
}
],
"store_path": "/nix/store/6ydyry316fcc59iy80zpnxnsrh9f18ki-ffmpeg-6.1.1-bin"
},
"x86_64-darwin": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/84yjd9p94kknxpdn974ksb7y28l6paq4-ffmpeg-6.1.1-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/r1g3627b14nqpz4aqfp87ba0fh49ar5k-ffmpeg-6.1.1-man",
"default": true
},
{
"name": "data",
"path": "/nix/store/dv5lc67c3xykza11q5pwk4vivnsdswmw-ffmpeg-6.1.1-data"
},
{
"name": "dev",
"path": "/nix/store/f0qcama09w9kri8hqvn0lk89zck4978v-ffmpeg-6.1.1-dev"
},
{
"name": "doc",
"path": "/nix/store/7sg26fama7a6gpdm0kkphblzc2x03dfx-ffmpeg-6.1.1-doc"
},
{
"name": "lib",
"path": "/nix/store/rhq35qgr8yvhygpj24wm14anidf9gmmc-ffmpeg-6.1.1-lib"
},
{
"name": "out",
"path": "/nix/store/31q5qklv5jmv91sjs4ljmq45smi7ngxy-ffmpeg-6.1.1"
}
],
"store_path": "/nix/store/84yjd9p94kknxpdn974ksb7y28l6paq4-ffmpeg-6.1.1-bin"
},
"x86_64-linux": {
"outputs": [
{
"name": "bin",
"path": "/nix/store/wnmy246m582splkkqwpgza390sa4m1k1-ffmpeg-6.1.1-bin",
"default": true
},
{
"name": "man",
"path": "/nix/store/hqq9mmrwrbazfdcsmd2dd3jgpvpsyj5p-ffmpeg-6.1.1-man",
"default": true
},
{
"name": "lib",
"path": "/nix/store/5jynrssm1bvrj3kskwgyyhb2069f8dwv-ffmpeg-6.1.1-lib"
},
{
"name": "out",
"path": "/nix/store/xdb4w2ccvig6020ish7qpl88i8fqg2ai-ffmpeg-6.1.1"
},
{
"name": "data",
"path": "/nix/store/sw8wxzscsnxnvrwqzq4fqxvggcd1xic7-ffmpeg-6.1.1-data"
},
{
"name": "dev",
"path": "/nix/store/p6jd7041xggbkwyfzrgsm8ccj370w1hz-ffmpeg-6.1.1-dev"
},
{
"name": "doc",
"path": "/nix/store/1dfla14f5g5xwmw3w5cjfnwdfr64qw7z-ffmpeg-6.1.1-doc"
}
],
"store_path": "/nix/store/wnmy246m582splkkqwpgza390sa4m1k1-ffmpeg-6.1.1-bin"
}
}
},
"k9s@latest": {
"last_modified": "2024-07-20T09:11:00Z",
"resolved": "github:NixOS/nixpkgs/6e14bbce7bea6c4efd7adfa88a40dac750d80100#k9s",
"source": "devbox-search",
"version": "0.32.5",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/gcbiad83pqc4xyc3qr85gc7vdzn31yvl-k9s-0.32.5",
"default": true
}
],
"store_path": "/nix/store/gcbiad83pqc4xyc3qr85gc7vdzn31yvl-k9s-0.32.5"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/p977rq0i7cqirirnv5gzj4kdvi2gz0av-k9s-0.32.5",
"default": true
}
],
"store_path": "/nix/store/p977rq0i7cqirirnv5gzj4kdvi2gz0av-k9s-0.32.5"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/xsv5smy3931nznpilp2vlva8slzk63ps-k9s-0.32.5",
"default": true
}
],
"store_path": "/nix/store/xsv5smy3931nznpilp2vlva8slzk63ps-k9s-0.32.5"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/6zmdvw89ql0ani1zjh2im6wfhm3i0c94-k9s-0.32.5",
"default": true
}
],
"store_path": "/nix/store/6zmdvw89ql0ani1zjh2im6wfhm3i0c94-k9s-0.32.5"
}
}
},
"kubectl@latest": {
"last_modified": "2024-07-07T07:43:47Z",
"resolved": "github:NixOS/nixpkgs/b60793b86201040d9dee019a05089a9150d08b5b#kubectl",
@ -181,6 +377,54 @@
}
}
},
"kubernetes-helm@latest": {
"last_modified": "2024-07-20T09:11:00Z",
"resolved": "github:NixOS/nixpkgs/6e14bbce7bea6c4efd7adfa88a40dac750d80100#kubernetes-helm",
"source": "devbox-search",
"version": "3.15.3",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/z4w7bnylg9h3f543yrf9bcwkxzfs82z2-kubernetes-helm-3.15.3",
"default": true
}
],
"store_path": "/nix/store/z4w7bnylg9h3f543yrf9bcwkxzfs82z2-kubernetes-helm-3.15.3"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/aa4jksq9ljgha8plw5cqyxf60n931dir-kubernetes-helm-3.15.3",
"default": true
}
],
"store_path": "/nix/store/aa4jksq9ljgha8plw5cqyxf60n931dir-kubernetes-helm-3.15.3"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/5gjk6w3agm49ljiwi991ailvmw35zq1j-kubernetes-helm-3.15.3",
"default": true
}
],
"store_path": "/nix/store/5gjk6w3agm49ljiwi991ailvmw35zq1j-kubernetes-helm-3.15.3"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/n4p0zh1s8jz9mqf1r1pki23kviq4waa7-kubernetes-helm-3.15.3",
"default": true
}
],
"store_path": "/nix/store/n4p0zh1s8jz9mqf1r1pki23kviq4waa7-kubernetes-helm-3.15.3"
}
}
},
"nodejs@20": {
"last_modified": "2024-07-07T07:43:47Z",
"plugin_version": "0.0.2",
@ -293,6 +537,70 @@
"store_path": "/nix/store/qfv96sjcsslynqbilwj823x8nxvgj5cv-tilt-0.33.17"
}
}
},
"yt-dlp@latest": {
"last_modified": "2024-07-18T22:08:26Z",
"resolved": "github:NixOS/nixpkgs/cfa5366588c940ab6ee3bee399b337175545c664#yt-dlp",
"source": "devbox-search",
"version": "2024.7.16",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/sb1129rd65qynmf5pqshr2885g54hjdz-python3.12-yt-dlp-2024.7.16",
"default": true
},
{
"name": "dist",
"path": "/nix/store/7c75jyvxvqhnhlb8iv99m0m2gzaz1562-python3.12-yt-dlp-2024.7.16-dist"
}
],
"store_path": "/nix/store/sb1129rd65qynmf5pqshr2885g54hjdz-python3.12-yt-dlp-2024.7.16"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/rzr6pr2axf653258rnlrldx540wag1h0-python3.12-yt-dlp-2024.7.16",
"default": true
},
{
"name": "dist",
"path": "/nix/store/js7w9zzcydsf020njvhy3dbvarv4c9qj-python3.12-yt-dlp-2024.7.16-dist"
}
],
"store_path": "/nix/store/rzr6pr2axf653258rnlrldx540wag1h0-python3.12-yt-dlp-2024.7.16"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/q3sqpq348nj1zhlwsfmbwcqnmkfglmlj-python3.12-yt-dlp-2024.7.16",
"default": true
},
{
"name": "dist",
"path": "/nix/store/f6i4bq4fbcd1s7k660fkqr15g0lzrfvx-python3.12-yt-dlp-2024.7.16-dist"
}
],
"store_path": "/nix/store/q3sqpq348nj1zhlwsfmbwcqnmkfglmlj-python3.12-yt-dlp-2024.7.16"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/m47znwi2bc09g66j2kn6k7fvfx9cvr38-python3.12-yt-dlp-2024.7.16",
"default": true
},
{
"name": "dist",
"path": "/nix/store/7laiz6ilsx4xzk6xni7yl8g3g04wyl55-python3.12-yt-dlp-2024.7.16-dist"
}
],
"store_path": "/nix/store/m47znwi2bc09g66j2kn6k7fvfx9cvr38-python3.12-yt-dlp-2024.7.16"
}
}
}
}
}

View File

@ -13,5 +13,8 @@
"packageManager": "pnpm@9.5.0",
"dependencies": {
"types": "^0.1.1"
},
"devDependencies": {
"lerna": "^8.1.7"
}
}

View File

@ -3,3 +3,9 @@
Each folder here is an individual node package, each of which can reference each other. One reason we do this is to share utility functions between packages.
See https://pnpm.io/workspaces
These are Typescript internal packages, which means they export typescript, not javascript. The TS-to-JS build step occurs in the package which is doing the import.
Also see ../services/* which are also pnpm packages, with the diffentiation that those are full-fledged programs meant for running
Also see ../pnpm-workspace.yaml for more notes

View File

@ -1,32 +0,0 @@
'use strict'
import fastify, { type FastifyRequest } from 'fastify'
import { getPackageVersion } from '@futureporn/utils'
import graphileWorkerPlugin, { type ExtendedFastifyInstance } from './fastify-graphile-worker-plugin.js'
const version = getPackageVersion('../package.json')
interface RecordBodyType {
url: string;
channel: string;
}
function build(opts: Record<string, any>={}, connectionString: string) {
const app: ExtendedFastifyInstance = fastify(opts)
app.register(graphileWorkerPlugin, { connectionString })
app.get('/', async function (request, reply) {
return { app: '@futureporn/capture', version }
})
app.post('/api/record', async function (request: FastifyRequest<{ Body: RecordBodyType }>, reply) {
const { url, channel } = request.body
console.log(`POST /api/record with url=${url}, channel=${channel}`)
const job = await app.graphile.addJob('record', { url, channel })
return job
})
return app
}
export {
build
}

View File

@ -1,38 +0,0 @@
import { type Helpers } from 'graphile-worker'
import Record from '../Record.ts'
import 'dotenv/config'
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME was undefined in env');
if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was undefined in env');
if (!process.env.S3_REGION) throw new Error('S3_REGION was undefined in env');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was undefined in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was undefined in env');
type Payload = {
url: string;
channel: string;
}
export default async function (payload: Payload, helpers: Helpers): Promise<string> {
const { url, channel } = payload;
helpers.logger.info(`'record' task execution begin with url=${url}, channel=${channel}`);
const bucket = process.env.S3_BUCKET_NAME!
const endpoint = process.env.S3_ENDPOINT!
const region = process.env.S3_REGION!
const accessKeyId = process.env.S3_ACCESS_KEY_ID!
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegDownload({ url })
const record = new Record({ inputStream, bucket, s3Client, channel })
await record.start()
return record.id
};

View File

@ -6,10 +6,12 @@
"main": "src/index.ts",
"types": "src/index.ts",
"exports": {
"./*.js": "./src/*.js"
"./*.ts": "./src/*.ts"
},
"scripts": {
"test": "mocha --require ts-node/register src/**/*.spec.ts",
"test": "pnpm run test.unit && pnpm run test.integration",
"test.unit": "mocha --require ts-node/register src/**/*.spec.ts -g unit",
"test.integration": "mocha --require ts-node/register src/**/*.spec.ts -g integration",
"build": "tsup",
"dev": "nodemon --ext js,ts,json,yaml --exec \"node --loader ts-node/esm --disable-warning=ExperimentalWarning ./src/index.ts\"",
"start": "node ./dist/index.js",
@ -51,14 +53,22 @@
},
"packageManager": "pnpm@9.2.0",
"devDependencies": {
"@babel/preset-env": "^7.25.0",
"@babel/preset-typescript": "^7.24.7",
"@futureporn/utils": "workspace:^",
"@jest/globals": "^29.7.0",
"@types/chai": "^4.3.16",
"@types/cheerio": "^0.22.35",
"@types/jest": "^29.5.12",
"@types/mailparser": "^3.4.4",
"@types/mocha": "^10.0.7",
"@types/sinon": "^17.0.3",
"chai": "^5.1.0",
"esmock": "^2.6.7",
"jest": "^29.7.0",
"mocha": "^10.4.0",
"nodemon": "^3.1.4",
"sinon": "^15.2.0",
"ts-node": "^10.9.2",
"tsup": "^8.1.2",
"typescript": "^5.5.3"

File diff suppressed because it is too large Load Diff

View File

@ -3,19 +3,17 @@ import { expect } from 'chai';
import { getInitialRoomDossier, getRandomRoom } from './cb.js'
describe('cb', function () {
describe('integration', function () {
describe('getInitialRoomDossier', function () {
/**
* this is an integration test that fails in CI due to CB blocking IP ranges
* @todo use a proxy or something
*/
xit('should return json', async function () {
this.timeout(1000*16)
it('should return json', async function () {
const dossier = await getInitialRoomDossier('https://chaturbate.com/projektmelody')
expect(dossier).to.have.property('wschat_host')
expect(dossier).to.have.property('wschat_host', 'dossier was missing wschat_host')
})
})
describe('getRandomRoom', function () {
it('should return a Room object of an online room', async function () {
this.timeout(1000*60*2)
this.timeout(1000*16)
const room = await getRandomRoom()
expect(room).to.have.property('url')
expect(room).to.have.property('name')
@ -24,3 +22,4 @@ describe('cb', function () {
})
})
})
})

View File

@ -75,7 +75,11 @@ export async function getInitialRoomDossier(roomUrl: string) {
export async function getRandomRoom(): Promise<Room> {
try {
const res = await fetch('https://chaturbate.com/api/public/affiliates/onlinerooms/?wm=DiPkB&client_ip=request_ip');
const res = await fetch('https://chaturbate.com/api/public/affiliates/onlinerooms/?wm=DiPkB&client_ip=request_ip', {
headers: {
accept: 'application/json'
}
});
const data = await res.json() as ChaturbateOnlineModelsResponse;
if (!data || !Array.isArray(data.results) || data.results.length === 0) {

View File

@ -0,0 +1,9 @@
import icons from './icons.ts'
export default function hello(thing: string) {
if (thing === 'world') {
return icons.world
} else {
return 'hi'
}
}

View File

@ -0,0 +1,4 @@
export default ({
world: '🌏',
sun: '☀️'
})

View File

@ -0,0 +1,30 @@
import { expect } from "chai"
import spawnWrapper from "./spawnWrapper.ts"
import { getRandomRoom } from "./cb.ts"
describe('spawnWrapper', function () {
describe('integration', function () {
this.timeout(1000*8)
let roomUrl: string
this.beforeAll(async function () {
roomUrl = (await getRandomRoom()).url
})
it('should get a playlistUrl of an active stream', async function () {
// the system under test is the network integration
const {code, output} = await spawnWrapper('yt-dlp', ['-g', roomUrl])
expect(code).to.equal(0)
expect(output).to.match(/https:\/\/.*\.m3u8/)
})
// these tests are flaky because the rooms used will not always be in the same state
xit('should handle when the room is offline', async function () {
const {code, output} = await spawnWrapper('yt-dlp', ['-g', 'chaturbate.com/48507961285'])
expect(code).to.equal(1)
expect(output).to.match(/Room is currently offline/)
})
xit('should handle when the room is passworded', async function () {
const {code, output} = await spawnWrapper('yt-dlp', ['-g', 'chaturbate.com/projektmelody'])
expect(code).to.equal(1)
expect(output).to.match(/Unable to find stream URL/)
})
})
})

View File

@ -0,0 +1,34 @@
import child_process from 'node:child_process'
export interface SpawnOutput {
code: number;
output: string;
}
/**
* we have this child_process.spawn wrapper to make testing easier.
* this function is meant to be mocked during unit tests so the function logic can be tested
* without making a network request.
*/
export default async function spawnWrapper (command: string, args: string[]): Promise<SpawnOutput> {
console.log(`spawnWrapper command=${command}, args=${JSON.stringify(args, null, 2)}`)
return new Promise((resolve, reject) => {
let output = '';
const process = child_process.spawn(command, args)
process.on('exit', function (code) {
if (code === undefined || code === null) throw new Error('process exited without an exit code');
resolve({ code, output })
})
process.stdout.on('data', (data) => {
output += data
})
process.stderr.on('data', (data) => {
output += data
})
process.on('error', function (e) {
reject(e)
})
})
}

View File

@ -0,0 +1,53 @@
import { getRandomRoom } from './cb.ts'
import { expect } from 'chai'
import esmock from 'esmock'
import { mock } from 'node:test'
describe('esmock integration', function () {
// sanity test to ensure esmock functionality doesn't break
// here we are overriding the hello.ts module's functionality
// normally it would return an Earth emoji with Asia visible.
it('should return a planet Earth emoji with Americas visible', async function () {
const hello = await esmock('./hello.ts', {
'./icons': { world: '🌎' }
})
expect(hello('world')).to.equal('🌎')
expect(hello()).to.equal('hi')
})
})
describe('ytdlp', function () {
describe('integration', function () {
let roomUrl: string;
this.beforeAll(async function () {
roomUrl = (await getRandomRoom()).url
})
})
describe('unit', function () {
it('should handle 403s by using a proxy', async function () {
this.timeout(2000)
const ytdlpErrorRequestForbidden = "ERROR: [Chaturbate] projektmelody: Unable to download webpage: HTTP Error 403: Forbidden (caused by <HTTPError 403: 'Forbidden'>); please report this issue on https://github.com/yt-dlp/yt-dlp/issues?q= , filling out the appropriate issue template. Confirm you are on the latest version using yt-dlp -U"
const requestSimulator = mock.fn(() => {
if (requestSimulator.mock.calls.length === 0) {
return {
code: 1,
output: ytdlpErrorRequestForbidden
}
} else {
return {
code: 0,
output: 'https://example.com/playlist.m3u8'
}
}
})
const ytdlp = await esmock('./ytdlp.ts', {
// simulate a yt-dlp request getting blocked by Cloudflare
'./spawnWrapper.ts': requestSimulator
})
const url = await ytdlp.getPlaylistUrl('chaturbate.com/projektmelody')
expect(url).to.match(/https:\/\/.*\.m3u8/)
})
})
})

View File

@ -0,0 +1,60 @@
import spawnWrapper from './spawnWrapper.ts'
import 'dotenv/config'
const maxRetries = 3
export class ExhaustedRetries extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, ExhaustedRetries.prototype)
}
getErrorMessage() {
return `ExhaustedRetries: We retried the request the maximum amount of times. maxRetries of ${maxRetries} was reached.`
}
}
export class RoomOffline extends Error {
constructor(message?: string) {
super(message)
Object.setPrototypeOf(this, ExhaustedRetries.prototype)
}
getErrorMessage() {
return `RoomOffline. ${this.message}`
}
}
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string> {
console.log(`getPlaylistUrl roomUrl=${roomUrl}, proxy=${false}, retries=${retries}`)
let args = ['-g', roomUrl]
if (proxy) {
if (!process.env.HTTP_PROXY) throw new Error('HTTP_PROXY is undefined in env');
args = args.concat(['--proxy', process.env.HTTP_PROXY!])
}
const { code, output } = await spawnWrapper('yt-dlp', args)
if (output.match(/HTTP Error 403/)) {
// we were likely blocked by Cloudflare
// we make the request a second time, this time via proxy
if (retries < maxRetries) return getPlaylistUrl(roomUrl, true, retries+=1);
else throw new ExhaustedRetries();
} else if (output.match(/Unable to find stream URL/)) {
// sometimes this happens. a retry is in order.
if (retries < maxRetries) return getPlaylistUrl(roomUrl, proxy, retries+=1);
else throw new ExhaustedRetries()
} else if (code === 0 && output.match(/https:\/\/.*\.m3u8/)) {
// this must be an OK result with a playlist
return output
} else if (code === 1 && output.match(/Room is currently offline/)) {
throw new RoomOffline()
} else {
console.error('exotic scenario')
const msg = `We encountered an exotic scenario where code=${code} and output=${output}. Admin: please patch the code to handle this scenario.`
console.error(msg)
throw new Error(msg)
}
}
export default {
getPlaylistUrl
}

View File

@ -1,5 +1,7 @@
{
"compilerOptions": {
"noEmit": true,
"allowImportingTsExtensions": true,
// Base Options recommended for all projects
"esModuleInterop": true,
"skipLibCheck": true,
@ -22,9 +24,8 @@
},
// Include the necessary files for your project
"include": [
"**/*.ts",
"**/*.tsx"
],
"**/*.ts"
, "src/ytdlp.spec.ts", "src/ytdlp.js" ],
"exclude": [
"node_modules"
]

View File

@ -1,16 +0,0 @@
/**
* This file was automatically generated by Strapi.
* Any modifications made will be discarded.
*/
import strapiCloud from "@strapi/plugin-cloud/strapi-admin";
import i18N from "@strapi/plugin-i18n/strapi-admin";
import usersPermissions from "@strapi/plugin-users-permissions/strapi-admin";
import { renderAdmin } from "@strapi/strapi/admin";
renderAdmin(document.getElementById("strapi"), {
plugins: {
"strapi-cloud": strapiCloud,
i18n: i18N,
"users-permissions": usersPermissions,
},
});

View File

@ -1,62 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<!--
This file was automatically generated by Strapi.
Any modifications made will be discarded.
-->
<head>
<meta charset="utf-8" />
<meta
name="viewport"
content="width=device-width, initial-scale=1, viewport-fit=cover"
/>
<meta name="robots" content="noindex" />
<meta name="referrer" content="same-origin" />
<title>Strapi Admin</title>
<style>
html,
body,
#strapi {
height: 100%;
}
body {
margin: 0;
-webkit-font-smoothing: antialiased;
}
</style>
</head>
<body>
<div id="strapi"></div>
<noscript
><div class="strapi--root">
<div class="strapi--no-js">
<style type="text/css">
.strapi--root {
position: absolute;
top: 0;
right: 0;
left: 0;
bottom: 0;
background: #fff;
}
.strapi--no-js {
position: absolute;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
text-align: center;
font-family: helvetica, arial, sans-serif;
}
</style>
<h1>JavaScript disabled</h1>
<p>
Please
<a href="https://www.enable-javascript.com/">enable JavaScript</a>
in your browser and reload the page to proceed.
</p>
</div>
</div></noscript
>
</body>
</html>

View File

@ -17,8 +17,9 @@
"@futureporn/scout": "workspace:*",
"@paralleldrive/cuid2": "^2.2.2",
"@types/node": "^20.14.9",
"@types/slug": "^5.0.8",
"p-retry": "^5.1.2",
"slugify": "^1.6.6"
"slug": "^9.1.0"
},
"devDependencies": {
"@types/chai": "^4.3.16",

View File

@ -17,12 +17,15 @@ importers:
'@types/node':
specifier: ^20.14.9
version: 20.14.11
'@types/slug':
specifier: ^5.0.8
version: 5.0.8
p-retry:
specifier: ^5.1.2
version: 5.1.2
slugify:
specifier: ^1.6.6
version: 1.6.6
slug:
specifier: ^9.1.0
version: 9.1.0
devDependencies:
'@types/chai':
specifier: ^4.3.16
@ -228,6 +231,9 @@ packages:
'@types/retry@0.12.1':
resolution: {integrity: sha512-xoDlM2S4ortawSWORYqsdU+2rxdh4LRW9ytc3zmT37RIKQh6IHyKwwtKhKis9ah8ol07DCkZxPt8BBvPjC6v4g==}
'@types/slug@5.0.8':
resolution: {integrity: sha512-mblTWR1OST257k1gZ3QvqG+ERSr8Ea6dyM1FH6Jtm4jeXi0/r0/95VNctofuiywPxCVQuE8AuFoqmvJ4iVUlXQ==}
acorn-walk@8.3.3:
resolution: {integrity: sha512-MxXdReSRhGO7VlFe1bRG/oI7/mdLV9B9JJT0N8vZOhF7gFRR5l3M8W9G8JxmKV+JC5mGqJ0QvqfSOLsCPa4nUw==}
engines: {node: '>=0.4.0'}
@ -524,9 +530,9 @@ packages:
serialize-javascript@6.0.2:
resolution: {integrity: sha512-Saa1xPByTTq2gdeFZYLLo+RFE35NHZkAbqZeWNd3BpzppeVisAqpDjcp8dyf6uIvEqJRd46jemmyA4iFIeVk8g==}
slugify@1.6.6:
resolution: {integrity: sha512-h+z7HKHYXj6wJU+AnS/+IH8Uh9fdcX1Lrhg1/VMdf9PwoBQXFcXiAdsy2tSK0P6gKwJLXp02r90ahUCqHk9rrw==}
engines: {node: '>=8.0.0'}
slug@9.1.0:
resolution: {integrity: sha512-ioOsCfzQSu+D6NZ8XMCR8IW9FgvF8W7Xzz56hBkB/ALvNaWeBs2MUvvPugq3GCrxfHPFeK6hAxGkY/WLnfX2Lg==}
hasBin: true
string-width@4.2.3:
resolution: {integrity: sha512-wKyQRQpjJ0sIp62ErSZdGsjMJWsap5oRNihHhu6G7JVO/9jIB6UyevL+tXuOqrng8j/cxKTWyWUwvSTriiZz/g==}
@ -724,6 +730,8 @@ snapshots:
'@types/retry@0.12.1': {}
'@types/slug@5.0.8': {}
acorn-walk@8.3.3:
dependencies:
acorn: 8.12.1
@ -1012,7 +1020,7 @@ snapshots:
dependencies:
randombytes: 2.1.0
slugify@1.6.6: {}
slug@9.1.0: {}
string-width@4.2.3:
dependencies:

View File

@ -1,30 +1,36 @@
import slugify from './slugifyFix.js';
import slug from 'slug';
import os from 'node:os';
import fs from 'node:fs';
import { createId } from '@paralleldrive/cuid2';
import { ua0 } from '@futureporn/scout/ua.js';
import { Readable } from 'stream';
import { finished } from 'stream/promises';
import pRetry from 'p-retry';
import { dirname, basename, join, isAbsolute } from 'node:path';
import { fileURLToPath } from 'url';
export const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(fileURLToPath(import.meta.url));
const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'
export function getPackageVersion(packageJsonPath: string): string {
if (!packageJsonPath) throw new Error('getPackageVersion requires packageJsonPath as first argument, but it was undefined.');
if (!isAbsolute(packageJsonPath)) {
packageJsonPath = join(__dirname, packageJsonPath)
}
try {
const raw = fs.readFileSync(packageJsonPath, { encoding: 'utf-8' })
const json = JSON.parse(raw)
return json.version
} catch (e) {
console.error('failed to getPackageVersion')
console.error(e)
return 'IDK'
}
}
export function fpSlugify(str: string): string {
return slugify(str, {
return slug(str, {
replacement: '-',
lower: true,
strict: true,
locale: 'en',
trim: true,
});

View File

@ -1,8 +0,0 @@
/**
* Hack to make 'slugify' import work with "type": "module".
* @see https://github.com/simov/slugify/issues/173
*/
import slugify from 'slugify'
export default slugify as unknown as typeof slugify.default

View File

@ -1,5 +1,6 @@
{
"compilerOptions": {
"noEmit": true,
// Base Options recommended for all projects
"esModuleInterop": true,
"skipLibCheck": true,
@ -12,9 +13,9 @@
"strict": true,
"noUncheckedIndexedAccess": true,
"noImplicitOverride": true,
"allowImportingTsExtensions": true,
// Transpile our TypeScript code to JavaScript
"module": "NodeNext",
"outDir": "dist",
"lib": [
"es2022"
]

File diff suppressed because it is too large Load Diff

View File

@ -40,8 +40,8 @@ EOF
kubectl --namespace futureporn delete secret capture --ignore-not-found
kubectl --namespace futureporn create secret generic capture \
--from-literal=databaseUrl=${WORKER_DATABASE_URL} \
--from-literal=s3AccessKeyId=${S3_ACCESS_KEY_ID} \
--from-literal=s3SecretAccessKey=${S3_SECRET_ACCESS_KEY}
--from-literal=s3AccessKeyId=${S3_USC_BUCKET_KEY_ID} \
--from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY}
kubectl --namespace futureporn delete secret mailbox --ignore-not-found
kubectl --namespace futureporn create secret generic mailbox \

View File

@ -10,6 +10,10 @@ if [ -z $POSTGRES_PASSWORD ]; then
exit 5
fi
## Enable pgcrypto (needed by pg-boss)
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
CREATE EXTENSION pgcrypto;"
## Create the temporal databases
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
CREATE DATABASE temporal_visibility \
@ -66,6 +70,17 @@ kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PAS
IS_TEMPLATE = False;"
## Create PgBoss db (for backend tasks)
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
CREATE DATABASE pgboss \
WITH \
OWNER = postgres \
ENCODING = 'UTF8' \
LOCALE_PROVIDER = 'libc' \
CONNECTION LIMIT = -1 \
IS_TEMPLATE = False;"
## create futureporn user
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
CREATE ROLE futureporn \
@ -84,6 +99,8 @@ kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PAS
## grant futureporn user all privs
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
GRANT ALL PRIVILEGES ON DATABASE graphile_worker TO futureporn;"
kubectl -n futureporn exec ${postgres_pod_name} -- env PGPASSWORD=${POSTGRES_PASSWORD} psql -U postgres --command "\
GRANT ALL PRIVILEGES ON DATABASE pgboss TO futureporn;"
## import schema

9
services/README.md Normal file
View File

@ -0,0 +1,9 @@
# Futureporn node services
Each folder here is an individual node package
See https://pnpm.io/workspaces
Also see ../packages/* which are also pnpm packages, with the diffentiation that those are utility libraries meant for importing
Also see ../pnpm-workspace.yaml for more notes

View File

@ -1,15 +1,18 @@
{
"name": "@futureporn/capture",
"type": "module",
"version": "0.2.12",
"version": "0.3.5",
"license": "Unlicense",
"private": true,
"packageManager": "pnpm@9.5.0",
"scripts": {
"start": "node dist/index.js",
"build": "tsup",
"test": "mocha",
"integration": "FUTUREPORN_WORKDIR=/home/cj/Downloads mocha ./integration/**/*.test.js",
"dev": "tsx --watch ./src/index.ts",
"dev.nodemon": "pnpm nodemon --ext ts,json,yaml --ignore ./dist --watch ./src --watch ./node_modules/@futureporn --exec \"pnpm run dev.build\"",
"dev.build": "pnpm run build && pnpm run start",
"clean": "rm -rf dist",
"superclean": "rm -rf node_modules && rm -rf pnpm-lock.yaml && rm -rf dist"
},
@ -27,6 +30,7 @@
"diskusage": "^1.2.0",
"dotenv": "^16.4.5",
"execa": "^6.1.0",
"exponential-backoff": "^3.1.1",
"fastify": "^4.28.1",
"fastify-plugin": "^4.5.1",
"fastq": "^1.17.1",
@ -39,6 +43,7 @@
"ioredis": "^5.4.1",
"minimatch": "^5.1.6",
"p-retry": "^5.1.2",
"pg-boss": "^9.0.3",
"pino-pretty": "^11.2.1",
"postgres": "^3.4.4",
"rxjs": "^7.8.1",
@ -65,6 +70,7 @@
"sinon": "^15.2.0",
"sinon-chai": "^3.7.0",
"sinon-test": "^3.1.6",
"ts-node": "^10.9.2",
"tsup": "^8.1.2",
"tsx": "^4.16.2",
"typescript": "^5.5.3"

View File

@ -1,20 +1,20 @@
import { createId } from '@paralleldrive/cuid2'
import { spawn } from 'child_process';
import { ua0 } from '@futureporn/scout/ua.js'
import { PassThrough, pipeline, Readable } from 'stream';
import { PassThrough, pipeline, Readable, Writable } from 'stream';
import prettyBytes from 'pretty-bytes';
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import 'dotenv/config'
import { createWriteStream } from 'fs';
const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'
export interface RecordArgs {
filename?: string;
channel: string;
s3Client: S3Client;
bucket: string;
date?: string;
inputStream: Readable;
jobId: string;
}
interface MakeS3ClientOptions {
@ -24,45 +24,39 @@ interface MakeS3ClientOptions {
endpoint: string
}
interface getFFmpegDownloadOptions {
interface getFFmpegOptions {
url: string;
}
export default class Record {
readonly id: string;
private s3Client: S3Client;
private uploadStream: PassThrough;
private ticker?: NodeJS.Timeout;
inputStream: Readable;
counter: number;
bucket: string;
keyName: string;
datestamp: string;
filename?: string;
channel: string;
jobId: string;
date?: string;
// saveToDiskStream: Writable;
constructor({ inputStream, channel, s3Client, bucket }: RecordArgs) {
constructor({ inputStream, s3Client, bucket, jobId }: RecordArgs) {
if (!inputStream) throw new Error('Record constructor was missing inputStream.');
if (!bucket) throw new Error('Record constructor was missing bucket.');
if (!channel) throw new Error('Record constructer was missing channel!');
if (!jobId) throw new Error('Record constructer was missing jobId!');
if (!s3Client) throw new Error('Record constructer was missing s3Client');
this.inputStream = inputStream
this.id = createId()
this.s3Client = s3Client
this.bucket = bucket
this.channel = channel
this.jobId = jobId
this.counter = 0
this.datestamp = new Date().toISOString()
this.keyName = `${this.datestamp}-${channel}-${createId()}.ts`
this.keyName = `${this.datestamp}-${jobId}.ts`
this.uploadStream = new PassThrough()
// this.saveToDiskStream = createWriteStream('/tmp/idk.ts') // @todo delete this line
}
makeProgressTicker() {
this.ticker = setInterval(() => {
console.log(`[progress] ${this.counter} bytes (aggregate) (${prettyBytes(this.counter)}) have passed through the pipeline.`)
}, 1000 * 30)
}
static makeS3Client({
@ -82,9 +76,8 @@ export default class Record {
return client
}
static getFFmpegDownload({ url }: getFFmpegDownloadOptions): Readable {
static getFFmpegStream({ url }: getFFmpegOptions): Readable {
console.log(`getFFmpegStream using url=${url}`)
const ffmpegProc = spawn('ffmpeg', [
'-headers', `"User-Agent: ${ua0}"`,
'-i', url,
@ -100,17 +93,19 @@ export default class Record {
stdio: ['pipe', 'pipe', 'ignore']
})
return ffmpegProc.stdout
}
// async saveToDisk() {
// return new Promise((resolve, reject) => {
// this.saveToDiskStream.once('exit', resolve)
// this.saveToDiskStream.once('error', reject)
// })
// }
async uploadToS3() {
const target = {
Bucket: this.bucket,
Key: this.keyName,
// We do this to keep TS happy. Body expects a Readable, not a ReadableStream nor a NodeJS.ReadableStream
// Body: new Readable().wrap(this.uploadStream)
Body: this.uploadStream
}
@ -126,7 +121,6 @@ export default class Record {
parallelUploads3.on("httpUploadProgress", (progress) => {
console.log(progress)
if (progress?.loaded) {
console.log(`loaded ${progress.loaded} bytes (${prettyBytes(progress.loaded)})`);
} else {
@ -134,12 +128,14 @@ export default class Record {
}
});
console.log('awaiting parallelUploads3.done()...')
await parallelUploads3.done();
console.log('parallelUploads3.done() is complete.')
} catch (e) {
if (e instanceof Error) {
console.error(`while uploading a file to s3, we encountered an error`)
throw new Error(e.message);
console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`)
throw e
} else {
throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
}
@ -150,18 +146,48 @@ export default class Record {
async start() {
this.makeProgressTicker()
// @todo remove this
// @todo remove this -- this is test code to validate one stream at a time. here we are saving to disk
// @todo remove this
// streams setup
this.uploadStream.on('data', (data) => {
this.counter += data.length
if (this.counter % (1 * 1024 * 1024) <= 1024) {
console.log(`Received ${this.counter} bytes (${prettyBytes(this.counter)})`);
}
})
this.uploadStream.on('close', () => {
console.log('[!!!] upload stream has closed')
})
this.uploadStream.on('error', (e) => {
console.error('there was an error on the uploadStream. error as follows')
console.error(e)
})
// T.M.I.
// this.uploadStream.on('drain', () => {
// console.info('[vvv] drain on uploadStream.')
// })
// input stream event handlers
this.inputStream.on('close', () => {
console.log('[!!!] input stream has closed.')
})
this.inputStream.on('error', (e) => {
console.error('there was an error on the inputStream. error as follows')
console.error(e)
})
this.inputStream.on('drain', () => {
console.info('[vvv] drain on inputStream.')
})
// stream pipeline setup
// pipe the ffmpeg stream to the S3 upload stream
// this has the effect of uploading the stream to S3 at the same time we're recording it.
pipeline(
this.inputStream,
this.uploadStream,
// this.saveToDiskStream, // @todo delete this test code
this.uploadStream, // @todo restore this code
(err) => {
if (err) {
console.error(`pipeline errored.`)
@ -172,13 +198,15 @@ export default class Record {
}
)
// await this.saveToDisk()
console.log('awaiting uploadToS3()...')
await this.uploadToS3()
clearInterval(this.ticker)
console.log('uploadToS3() is complete.')
return {
id: this.id,
keyName: this.keyName,
channel: this.channel
jobId: this.jobId,
keyName: this.keyName
}
}

View File

@ -1,6 +1,6 @@
'use strict'
import { build } from './app.js'
import { build } from './app.ts'
import chai, { expect } from "chai"
import sinonChai from 'sinon-chai'
import sinon from 'sinon'
@ -77,7 +77,7 @@ describe('app', function () {
expect(JSON.parse(response.body)).to.have.property('fileSize')
expect(JSON.parse(response.body)).to.have.property('outputUrl')
})
xit('DELETE -- delete a record', async function () {
it('DELETE -- delete a record', async function () {
const response = await app.inject({
method: 'DELETE',
url: '/api/record'

View File

@ -0,0 +1,45 @@
'use strict'
import fastify, { type FastifyRequest } from 'fastify'
import { getPackageVersion } from '@futureporn/utils'
import pgbossPlugin, { type ExtendedFastifyInstance } from './fastify-pgboss-plugin.ts'
import PgBoss from 'pg-boss'
import { join, dirname } from 'node:path'
import { fileURLToPath } from 'node:url'
const __dirname = dirname(fileURLToPath(import.meta.url));
const version = getPackageVersion(join(__dirname, '../package.json'))
interface RecordBodyType {
url: string;
channel: string;
}
const build = function (opts: Record<string, any>={}, boss: PgBoss) {
const app: ExtendedFastifyInstance = fastify(opts)
app.register(pgbossPlugin, { boss })
app.get('/', async function (request, reply) {
return { app: '@futureporn/capture', version }
})
app.post('/api/record', async function (request: FastifyRequest<{ Body: RecordBodyType }>, reply) {
const { url, channel } = request.body
console.log(`POST /api/record with url=${url}`)
if (app?.boss) {
const jobId = await app.boss.send('record', {
url,
channel
})
return { jobId }
} else {
console.error(`app.boss was missing! Is the pgboss plugin registered to the fastify instance?`)
}
return { 'idk': true }
})
return app
}
export {
build
}

View File

@ -0,0 +1,31 @@
import PgBoss from 'pg-boss';
async function readme() {
const boss = new PgBoss('postgres://william:mysecretpassword@localhost:5435/william');
boss.on('error', (error: Error) => console.error(error));
await boss.start();
const queue = 'some-queue';
let jobId = await boss.send(queue, { param1: 'foo' })
console.log(`created job in queue ${queue}: ${jobId}`);
await boss.work(queue, someAsyncJobHandler);
}
async function someAsyncJobHandler(job: any) {
console.log(`job ${job.id} received with data:`);
console.log(JSON.stringify(job.data));
await new Promise((resolve, reject) => {
console.log('waiting 3s')
setTimeout(() => {
resolve(job.data)
}, 3000)
});
}
readme()

View File

@ -1,6 +1,6 @@
import { type FastifyInstance } from 'fastify'
import fp from 'fastify-plugin'
import { makeWorkerUtils } from 'graphile-worker'
import { type WorkerUtils, makeWorkerUtils } from 'graphile-worker'
type Options = {
connectionString: string;
@ -8,7 +8,7 @@ type Options = {
export interface ExtendedFastifyInstance extends FastifyInstance {
graphile?: any
graphile?: WorkerUtils
}
async function graphileWorkerPlugin (fastify: ExtendedFastifyInstance, opts: Options) {

View File

@ -0,0 +1,22 @@
import { type FastifyInstance } from 'fastify'
import fp from 'fastify-plugin'
import PgBoss from 'pg-boss'
type Options = {
boss: PgBoss;
}
export interface ExtendedFastifyInstance extends FastifyInstance {
boss?: PgBoss
}
async function pgbossPlugin (fastify: ExtendedFastifyInstance, opts: Options) {
if (!fastify.boss) {
if (!opts.boss) throw new Error('pgbossPlugin requires boss passed in options argument, but it was missing');
const boss = opts.boss
fastify.decorate('boss', boss)
}
}
export default fp(pgbossPlugin)

View File

@ -1,21 +1,26 @@
'use strict'
import { build } from './app.js'
import { build } from './app.ts'
import 'dotenv/config'
import { run } from 'graphile-worker'
import PgBoss, { Job } from 'pg-boss'
import { dirname } from 'node:path';
import { fileURLToPath } from 'url';
import record, { type RecordJob } from './tasks/record.ts'
const __dirname = dirname(fileURLToPath(import.meta.url));
if (!process.env.DATABASE_URL) throw new Error('DATABASE_URL is missing in env');
if (!process.env.FUNCTION) throw new Error(`FUNCTION env var was missing. FUNCTION env var must be either 'api' or 'worker'.`);
const connectionString = process.env.DATABASE_URL!
const concurrency = (process.env?.WORKER_CONCURRENCY) ? parseInt(process.env.WORKER_CONCURRENCY) : 1
async function api() {
async function api(boss: PgBoss) {
if (!process.env.PORT) throw new Error('PORT is missing in env');
console.log(`api FUNCTION listening on PORT ${process.env.PORT}`)
const PORT = parseInt(process.env.PORT!)
const fastifyOpts = {
@ -26,7 +31,8 @@ async function api() {
}
}
}
const server = build(fastifyOpts, connectionString)
const server = build(fastifyOpts, boss)
server.listen({ port: PORT }, (err) => {
if (err) {
@ -34,44 +40,40 @@ async function api() {
process.exit(1)
}
})
}
async function worker() {
const concurrency = (process.env?.WORKER_CONCURRENCY) ? parseInt(process.env.WORKER_CONCURRENCY) : 1
// Run a worker to execute jobs:
const runner = await run({
connectionString,
concurrency,
// Install signal handlers for graceful shutdown on SIGINT, SIGTERM, etc
noHandleSignals: false,
pollInterval: 1000,
taskDirectory: `${__dirname}/tasks`,
});
// Immediately await (or otherwise handle) the resulting promise, to avoid
// "unhandled rejection" errors causing a process crash in the event of
// something going wrong. console.log()
await runner.promise;
// If the worker exits (whether through fatal error or otherwise), the above
// promise will resolve/reject.
async function worker(boss: PgBoss) {
const queue = 'record'
const batchSize = 20
const options = {
teamSize: 1,
teamConcurrency: concurrency,
batchSize
}
await boss.work(queue, options, (job: RecordJob[]) => record(job))
}
async function main() {
const boss = new PgBoss({
connectionString
})
boss.on('error', (err: any) => console.error(err))
await boss.start()
if (process.env.FUNCTION === 'api') {
api()
api(boss)
} else if (process.env.FUNCTION === 'worker') {
worker()
worker(boss)
} else {
throw new Error('process.env.FUNCTION must be either api or worker. got '+process.env.FUNCTION)
}
}
main().catch((err) => {
console.error('there was an error!')
console.error(err);
process.exit(1);
});

View File

@ -20,10 +20,8 @@ import { getRandomRoom } from '@futureporn/scout/cb.js'
import { ua0 } from "@futureporn/scout/ua.js";
import { spawn } from "child_process";
import { PassThrough, pipeline } from "stream";
import { type Progress, Upload } from "@aws-sdk/lib-storage";
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import { createWriteStream } from 'fs';
import ffmpeg from 'fluent-ffmpeg'
import { createId } from '@paralleldrive/cuid2';
import prettyBytes from 'pretty-bytes';
import dotenv from 'dotenv'
@ -32,16 +30,16 @@ dotenv.config({
})
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME missing in env');
if (!process.env.S3_BUCKET_KEY_ID) throw new Error('S3_BUCKET_KEY_ID missing in env');
if (!process.env.S3_BUCKET_APPLICATION_KEY) throw new Error('S3_BUCKET_APPLICATION_KEY missing in env');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID missing in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY missing in env');
async function main() {
const client = new S3Client({
endpoint: 'https://s3.us-west-000.backblazeb2.com',
region: 'us-west-000',
credentials: {
accessKeyId: process.env.S3_BUCKET_KEY_ID!,
secretAccessKey: process.env.S3_BUCKET_APPLICATION_KEY!
accessKeyId: process.env.S3_ACCESS_KEY_ID!,
secretAccessKey: process.env.S3_SECRET_ACCESS_KEY!
}
});
@ -75,18 +73,18 @@ async function main() {
console.log(`playlistUrl=${playlistUrl}`)
if (!playlistUrl) throw new Error(`failed to get playlistUrl from yt-dlp -g ${randomRoom.url}`);
let debugCounter = 0
let fileOutputStream = createWriteStream('/home/cj/Downloads/outputfile.ts');
// let ffmpegLogStream = createWriteStream('/tmp/ffmpeg-log.txt')
let uploadStream = new PassThrough()
uploadStream.on('data', (data) => {
debugCounter += data.length
console.log(`[data] uploadStream. ${debugCounter} aggregated bytes (${prettyBytes(debugCounter)}).`)
})
uploadStream.on('drain', () => {
console.log('[drain] uploadStream')
if (debugCounter % (1 * 1024 * 1024) < 1024) {
console.log(`Received ${debugCounter} bytes (${prettyBytes(debugCounter)}) [${debugCounter % (1*1024*1024)}]`);
}
})
// uploadStream.on('drain', () => {
// console.log('[drain] uploadStream')
// })
uploadStream.on('close', () => {
console.log(`[close] uploadStream closed`)
})
@ -125,6 +123,8 @@ async function main() {
stdio: ['pipe', 'pipe', 'ignore']
})
console.log('the following is the ffmpegProc.stdout')
console.log(ffmpegProc.stdout.constructor.name)
// we set up a pipeline which has an readable stream (ffmpeg), a transform stream (debug), and a writable stream (s3 Upload)
pipeline(

View File

@ -0,0 +1,76 @@
import Record from '../Record.ts'
import { getPlaylistUrl } from '@futureporn/scout/ytdlp.ts'
import 'dotenv/config'
import { type Job } from 'pg-boss'
import { backOff } from "exponential-backoff"
export interface RecordJob extends Job {
data: {
url: string;
}
}
async function _record (job: RecordJob, retries?: number): Promise<string> {
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME was undefined in env');
if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was undefined in env');
if (!process.env.S3_REGION) throw new Error('S3_REGION was undefined in env');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was undefined in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was undefined in env');
if (!job) throw new Error('Job sent to job worker execution callback was empty!!!');
const { url } = job.data;
console.log(`'record' job ${job!.id} begin with url=${url}`)
const bucket = process.env.S3_BUCKET_NAME!
const endpoint = process.env.S3_ENDPOINT!
const region = process.env.S3_REGION!
const accessKeyId = process.env.S3_ACCESS_KEY_ID!
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
let playlistUrl
try {
playlistUrl = await getPlaylistUrl(url)
console.log(`playlistUrl=${playlistUrl}`)
} catch (e) {
console.error('error during getPlaylistUrl()')
console.error(e)
throw e
}
const jobId = job.id
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const record = new Record({ inputStream, bucket, s3Client, jobId })
await record.start()
console.log(`record job ${job.id} complete`)
return job.id
}
export default async function main (jobs: RecordJob[]): Promise<any> {
// @todo why are we passed multiple jobs? I'm expecting only one.
const backOffOptions = {
numOfAttempts: 5,
startingDelay: 5000,
retry: (e: any, attemptNumber: number) => {
console.log(`Record Job is retrying. Attempt number ${attemptNumber}. e=${JSON.stringify(e, null, 2)}`)
return true
}
}
for (const j of jobs) {
console.log(`record job ${j.id} GO GO GO`)
try {
await backOff(() => _record(j), backOffOptions)
} catch (e) {
console.warn(`record job ${j.id} encountered the following error.`)
console.error(e)
}
console.log(`record job ${j.id} is finished.`)
}
};

View File

@ -5,7 +5,7 @@
"noEmit": true, // tsup does the emissions
"esModuleInterop": true,
"skipLibCheck": true,
"target": "es2022",
"target": "ESNext",
"allowJs": true,
"moduleResolution": "Bundler",
"resolveJsonModule": true,
@ -19,12 +19,12 @@
"module": "ESNext",
"outDir": "dist",
"lib": [
"es2022"
"ESNext"
]
},
// Include the necessary files for your project
"include": [
"**/*.ts"
"src/**/*.ts"
],
"exclude": [
"node_modules"

Some files were not shown because too many files have changed in this diff Show More