capture progress
ci / build (push) Has been cancelled Details

This commit is contained in:
CJ_Clippy 2024-08-01 11:16:35 -08:00
parent 30ec7404bb
commit f1371970ac
29 changed files with 1448 additions and 334 deletions

View File

@ -13,6 +13,16 @@ jobs:
- uses: actions/checkout@v3
name: Check out code
- uses: mr-smithers-excellent/docker-build-push@v6
name: Build futureporn/bot
with:
image: futureporn/bot
tags: latest
registry: gitea.futureporn.net
dockerfile: d.bot.dockerfile
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
- uses: mr-smithers-excellent/docker-build-push@v6
name: Build futureporn/migrations
with:

View File

@ -136,29 +136,22 @@ docker_build(
pull=False,
)
# docker_build(
# 'fp/bot',
# '.',
# only=[
# './.npmrc',
# './package.json',
# './pnpm-lock.yaml',
# './pnpm-workspace.yaml',
# './packages/bot',
# './packages/image',
# './packages/scout',
# './packages/storage',
# './packages/workflows',
# './packages/types',
# './packages/utils',
# ],
# dockerfile='./d.bot.dockerfile',
# target='dev',
# live_update=[
# sync('./packages/bot', '/app'),
# run('cd /app && pnpm i', trigger=['./packages/bot/package.json', './packages/bot/pnpm-lock.yaml'])
# ]
# )
docker_build(
'fp/bot',
'.',
only=[
'./.npmrc',
'./package.json',
'./pnpm-lock.yaml',
'./pnpm-workspace.yaml',
'./services/bot',
],
dockerfile='./d.bot.dockerfile',
target='dev',
live_update=[
sync('./services/bot', '/app')
]
)
@ -192,13 +185,6 @@ cmd_button('capture-api:create',
text='Start Recording'
)
cmd_button('postgrest:restore',
argv=['./scripts/postgrest.sh'],
resource='postgrest',
icon_name='start',
text='initialize',
)
cmd_button('postgrest:migrate',
argv=['./scripts/postgrest-migrations.sh'],
resource='postgrest',
@ -295,20 +281,19 @@ docker_build(
'fp/capture',
'.',
dockerfile='d.capture.dockerfile',
target='capture',
target='dev',
only=[
'./.npmrc',
'./package.json',
'./pnpm-lock.yaml',
'./pnpm-workspace.yaml',
'./packages/capture',
'./packages/scout',
'./packages/types',
'./packages/utils',
'./services/capture',
],
live_update=[
sync('./packages/capture/dist', '/app/dist'),
sync('./services/capture/dist', '/app/dist'),
],
pull=False,
)
@ -441,11 +426,11 @@ k8s_resource(
# )
# k8s_resource(
# workload='bot',
# labels=['backend'],
# # resource_deps=['strapi'],
# )
k8s_resource(
workload='bot',
labels=['backend'],
resource_deps=['postgrest'],
)
k8s_resource(
workload='capture-api',
port_forwards=['5003'],

View File

@ -0,0 +1,49 @@
---
apiVersion: v1
kind: Service
metadata:
name: redis
namespace: futureporn
spec:
type: ClusterIP
selector:
app: redis
ports:
- name: web
port: {{ .Values.redis.port }}
targetPort: http
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: futureporn
labels:
app: redis
spec:
replicas: {{ .Values.redis.replicas }}
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: "{{ .Values.redis.image }}"
ports:
- name: http
containerPort: {{ .Values.redis.port }}
env:
- name: PGRST_DB_ANON_ROLE
value: anonymous
- name: PGRST_JWT_SECRET
valueFrom:
secretKeyRef:
name: redis
key: jwtSecret

View File

@ -0,0 +1,58 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: bot
namespace: futureporn
labels:
app.kubernetes.io/name: bot
spec:
replicas: {{ .Values.bot.replicas }}
selector:
matchLabels:
app: bot
template:
metadata:
labels:
app: bot
spec:
containers:
- name: bot
image: "{{ .Values.bot.imageName }}"
env:
- name: AUTOMATION_USER_JWT
valueFrom:
secretKeyRef:
name: bot
key: automationUserJwt
- name: DISCORD_TOKEN
valueFrom:
secretKeyRef:
name: bot
key: discordToken
- name: DISCORD_APPLICATION_ID
valueFrom:
secretKeyRef:
name: bot
key: discordApplicationId
- name: DISCORD_CHANNEL_ID
valueFrom:
secretKeyRef:
name: bot
key: discordChannelId
- name: DISCORD_GUILD_ID
valueFrom:
secretKeyRef:
name: bot
key: discordGuildId
- name: WORKER_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: bot
key: workerConnectionString
resources:
limits:
cpu: 150m
memory: 256Mi
restartPolicy: Always

View File

@ -38,11 +38,18 @@ spec:
env:
- name: FUNCTION
value: worker
- name: PGBOSS_URL
- name: WORKER_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: capture
key: pgbossUrl
key: workerConnectionString
- name: AUTOMATION_USER_JWT
valueFrom:
secretKeyRef:
name: postgrest
key: jwtSecret
- name: POSTGREST_URL
value: http://postgrest.futureporn.svc.cluster.local:9000
- name: PORT
value: "{{ .Values.capture.api.port }}"
- name: S3_ENDPOINT
@ -95,11 +102,11 @@ spec:
env:
- name: FUNCTION
value: api
- name: PGBOSS_URL
- name: WORKER_CONNECTION_STRING
valueFrom:
secretKeyRef:
name: capture
key: pgbossUrl
key: workerConnectionString
- name: PORT
value: "{{ .Values.capture.api.port }}"
resources:

View File

@ -1,4 +1,18 @@
---
apiVersion: v1
kind: Service
metadata:
name: postgrest
namespace: futureporn
spec:
type: ClusterIP
selector:
app: postgrest
ports:
- name: web
port: {{ .Values.postgrest.port }}
targetPort: http
protocol: TCP
---

View File

@ -22,7 +22,7 @@ next:
capture:
imageName: fp/capture
worker:
replicas: 1
replicas: 3
api:
port: 5003
replicas: 1

View File

@ -1,36 +1,25 @@
FROM node:20.15 as base
FROM node:20 AS base
ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app
RUN corepack enable && corepack prepare pnpm@9.5.0 --activate
RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENTRYPOINT ["pnpm"]
FROM base AS install
COPY pnpm-lock.yaml .npmrc package.json .
COPY ./packages/bot/ ./packages/bot/
COPY ./packages/types/ ./packages/types/
COPY ./packages/storage/ ./packages/storage/
COPY ./packages/scout/ ./packages/scout/
COPY ./packages/image/ ./packages/image/
COPY ./packages/utils/ ./packages/utils/
COPY ./services/bot/ ./services/bot/
# RUN ls -lash .
# RUN ls -lash ./packages/
# RUN ls -lash ./packages/bot/
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm fetch
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm install --recursive --frozen-lockfile --prefer-offline
# RUN ls -lash .
# RUN ls -la ./packages
FROM install AS build
RUN pnpm -r build
RUN pnpm deploy --filter=bot /prod/bot-dev
RUN pnpm deploy --filter=bot --prod /prod/bot
FROM base AS dev
COPY --from=build /prod/bot-dev .
FROM install AS dev
WORKDIR /app/services/bot
CMD ["run", "dev"]

View File

@ -42,6 +42,12 @@ RUN mkdir -p /prod/capture
RUN --mount=type=cache,id=pnpm,target=/pnpm/store pnpm --filter=@futureporn/capture deploy --prod /prod/capture
FROM build AS dev
WORKDIR /app/services/capture
ENTRYPOINT ["pnpm", "run", "dev"]
## start the app with dumb init to spawn the Node.js runtime process
## with signal support
## The mode @futureporn/capture uses when starting is determined by FUNCTION environment variable. (worker|api)

View File

@ -33,6 +33,15 @@ EOF
# --from-literal=b2Key=${UPPY_B2_KEY} \
# --from-literal=b2Secret=${UPPY_B2_SECRET}\
kubectl --namespace futureporn delete secret bot --ignore-not-found
kubectl --namespace futureporn create secret generic bot \
--from-literal=automationUserJwt=${AUTOMATION_USER_JWT} \
--from-literal=discordToken=${DISCORD_TOKEN} \
--from-literal=discordChannelId=${DISCORD_CHANNEL_ID} \
--from-literal=discordGuildId=${DISCORD_GUILD_ID} \
--from-literal=discordApplicationId=${DISCORD_APPLICATION_ID} \
--from-literal=workerConnectionString=${WORKER_CONNECTION_STRING}
kubectl --namespace futureporn delete secret pgadmin4 --ignore-not-found
kubectl --namespace futureporn create secret generic pgadmin4 \
--from-literal=email=${PGADMIN_DEFAULT_EMAIL} \
@ -45,7 +54,7 @@ kubectl --namespace futureporn create secret generic postgrest \
kubectl --namespace futureporn delete secret capture --ignore-not-found
kubectl --namespace futureporn create secret generic capture \
--from-literal=pgbossUrl=${PGBOSS_URL} \
--from-literal=workerConnectionString=${WORKER_CONNECTION_STRING} \
--from-literal=s3AccessKeyId=${S3_USC_BUCKET_KEY_ID} \
--from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY}
@ -58,22 +67,11 @@ kubectl --namespace futureporn create secret generic mailbox \
--from-literal=imapPassword=${IMAP_PASSWORD} \
--from-literal=imapAccessToken=${IMAP_ACCESS_TOKEN}
kubectl --namespace futureporn delete secret trigger --ignore-not-found
kubectl --namespace futureporn create secret generic trigger \
--from-literal=redisUrl=${TRIGGER_REDIS_URL} \
--from-literal=encryptionKey=${TRIGGER_ENCRYPTION_KEY} \
--from-literal=providerSecret=${TRIGGER_PROVIDER_SECRET} \
--from-literal=coordinatorSecret=${TRIGGER_COORDINATOR_SECRET} \
--from-literal=magicLinkSecret=${TRIGGER_MAGIC_LINK_SECRET} \
--from-literal=sessionSecret=${TRIGGER_SESSION_SECRET} \
--from-literal=databaseUrl=${TRIGGER_DATABASE_URL} \
--from-literal=loginUrl=${TRIGGER_LOGIN_ORIGIN} \
--from-literal=appOrigin=${TRIGGER_APP_ORIGIN}
kubectl --namespace futureporn delete secret discord --ignore-not-found
kubectl --namespace futureporn create secret generic discord \
--from-literal=token=${DISCORD_TOKEN} \
--from-literal=applicationId=${DISCORD_APPLICATION_ID}
# kubectl --namespace futureporn delete secret discord --ignore-not-found
# kubectl --namespace futureporn create secret generic discord \
# --from-literal=token=${DISCORD_TOKEN} \
# --from-literal=applicationId=${DISCORD_APPLICATION_ID}
kubectl --namespace futureporn delete secret redis --ignore-not-found
kubectl --namespace futureporn create secret generic redis \
@ -113,22 +111,9 @@ kubectl --namespace futureporn create secret generic grafana \
--from-literal=admin-password=${GRAFANA_PASSWORD}
kubectl --namespace futureporn delete secret scout --ignore-not-found
kubectl --namespace futureporn create secret generic scout \
--from-literal=recentsToken=${SCOUT_RECENTS_TOKEN} \
--from-literal=strapiApiKey=${SCOUT_STRAPI_API_KEY} \
--from-literal=imapServer=${SCOUT_IMAP_SERVER} \
--from-literal=imapPort=${SCOUT_IMAP_PORT} \
--from-literal=imapUsername=${SCOUT_IMAP_USERNAME} \
--from-literal=imapPassword=${SCOUT_IMAP_PASSWORD} \
--from-literal=imapAccessToken=${SCOUT_IMAP_ACCESS_TOKEN} \
--from-literal=nitterAccessKey=${SCOUT_NITTER_ACCESS_KEY} \
--from-literal=s3BucketKeyId=${S3_BUCKET_KEY_ID} \
--from-literal=s3BucketApplicationKey=${S3_BUCKET_APPLICATION_KEY}
kubectl --namespace futureporn delete secret link2cid --ignore-not-found
kubectl --namespace futureporn create secret generic link2cid \
--from-literal=apiKey=${LINK2CID_API_KEY}
# kubectl --namespace futureporn delete secret link2cid --ignore-not-found
# kubectl --namespace futureporn create secret generic link2cid \
# --from-literal=apiKey=${LINK2CID_API_KEY}
kubectl --namespace cert-manager delete secret vultr --ignore-not-found
kubectl --namespace cert-manager create secret generic vultr \

View File

@ -3,8 +3,8 @@ import type {} from "graphile-worker";
const preset: GraphileConfig.Preset = {
worker: {
connectionString: process.env.DATABASE_URL,
concurrentJobs: 3,
connectionString: process.env.WORKER_CONNECTION_STRING,
concurrentJobs: 5,
fileExtensions: [".js", ".ts"],
},
};

View File

@ -1,29 +1,32 @@
{
"name": "@futureporn/bot",
"type": "module",
"version": "1.0.0",
"description": "",
"version": "1.0.4",
"description": "Futureporn Discord bot",
"main": "dist/index.js",
"scripts": {
"test": "echo \"Warn: no test specified\" && exit 0",
"start": "node ./dist/index.js",
"dev": "nodemon --ext js,ts,json,yaml --exec \"node --loader ts-node/esm --disable-warning=ExperimentalWarning ./src/index.ts\"",
"dev.nodemon": "nodemon --ext js,ts,json,yaml --exec \"node --loader ts-node/esm --disable-warning=ExperimentalWarning ./src/index.ts\"",
"dev": "tsx --watch ./src/index.ts",
"build": "tsc --build",
"clean": "rm -rf dist",
"superclean": "rm -rf node_modules && rm -rf pnpm-lock.yaml && rm -rf dist"
},
"packageManager": "pnpm@9.5.0",
"packageManager": "pnpm@9.6.0",
"keywords": [],
"author": "",
"author": "@CJ_Clippy",
"license": "Unlicense",
"dependencies": {
"discord.js": "^14.15.3",
"dotenv": "^16.4.5",
"pg-boss": "^9.0.3"
"graphile-config": "0.0.1-beta.9",
"graphile-worker": "^0.16.6"
},
"devDependencies": {
"nodemon": "^3.1.4",
"ts-node": "^10.9.2",
"tsx": "^4.16.2",
"typescript": "^5.5.3"
}
}

File diff suppressed because it is too large Load Diff

View File

@ -37,7 +37,7 @@ export default {
data: new SlashCommandBuilder()
.setName('donger')
.setDescription('Replies with a free donger!'),
async execute(interaction: ChatInputCommandInteraction): Promise<void> {
async execute({ interaction }: { interaction: ChatInputCommandInteraction}): Promise<void> {
await interaction.reply({
content: dongers[Math.floor(Math.random()*dongers.length)]
});

View File

@ -9,6 +9,7 @@ import {
import type { ExecuteArguments } from '../../index.js';
if (!process.env.AUTOMATION_USER_JWT) throw new Error(`AUTOMATION_USER_JWT was missing from env`);
export default {
data: new SlashCommandBuilder()
@ -20,9 +21,8 @@ export default {
.setDescription('The channel URL to record')
.setRequired(true)
),
async execute({ interaction, boss }: ExecuteArguments): Promise<void> {
async execute({ interaction, workerUtils }: ExecuteArguments): Promise<void> {
const url = interaction.options.getString('url')
const jobId = await boss.send('record', { url })
// const row = new ActionRowBuilder<ButtonBuilder>()
// .addComponents(component);
@ -59,7 +59,7 @@ export default {
const idk = await interaction.reply({
content: `Recording queued. jobId=${jobId}`,
content: `Recording ${url}`,
embeds: [
statusEmbed
],
@ -71,19 +71,10 @@ export default {
// console.log('the following is idk, the return value from interaction.reply')
// console.log(idk)
const message = await idk.fetch();
console.log('the following is the message, retrieved by awaiting idk.fetch()')
console.log(message)
const message = await idk.fetch()
const discordMessageId = message.id
await workerUtils.addJob('startRecording', { url, discordMessageId })
// create a Discord Interaction in the db which relates the discord message to the record Job.
// we can later use the record to update the status displayed in the discord message
await fetch(`${STRAPI_URL}/api/discord-interaction`)
// 1267182888403861608
// await interaction.reply({ components: [row] });
// await interaction.reply({
// content: `Recording queued. ID ${jobId}`
// });
},
};

View File

@ -2,30 +2,34 @@ import 'dotenv/config'
import { type ChatInputCommandInteraction, Client, Events, GatewayIntentBits, Partials } from 'discord.js'
import loadCommands from './loadCommands.js'
import deployCommands from './deployCommands.js'
import PgBoss from 'pg-boss'
import discordMessageUpdate from './tasks/discordMessageUpdate.js'
import { makeWorkerUtils, type WorkerUtils } from 'graphile-worker'
export interface ExecuteArguments {
interaction: ChatInputCommandInteraction;
boss: PgBoss;
workerUtils: WorkerUtils
}
if (!process.env.AUTOMATION_USER_JWT) throw new Error(`AUTOMATION_USER_JWT was missing from env`);
if (!process.env.DISCORD_TOKEN) throw new Error("DISCORD_TOKEN was missing from env");
if (!process.env.DISCORD_CHANNEL_ID) throw new Error("DISCORD_CHANNEL_ID was missing from env");
if (!process.env.PGBOSS_URL) throw new Error("PGBOSS_URL was missing from env");
const connectionString = process.env.PGBOSS_URL!
if (!process.env.WORKER_CONNECTION_STRING) throw new Error("WORKER_CONNECTION_STRING was missing from env");
async function setupGraphileWorker() {
const workerUtils = await makeWorkerUtils({
connectionString: process.env.WORKER_CONNECTION_STRING!,
});
await workerUtils.migrate()
return workerUtils
}
async function setup(commands: any[]) {
async function setupDiscordBot(commands: any[], workerUtils: WorkerUtils) {
console.log(`setup()`)
if (!commands) throw new Error('commands passed to setup() was missing');
const channelId = '' + process.env.DISCORD_CHANNEL_ID
// setup pg-boss
const boss = new PgBoss({ connectionString })
boss.on('error', (err: any) => console.error(err))
await boss.start()
// Create a new client instance
console.log(`Create a new client instance`)
const client = new Client({
intents: [
GatewayIntentBits.Guilds,
@ -44,13 +48,18 @@ async function setup(commands: any[]) {
if (!interaction.isChatInputCommand()) return;
const { commandName } = interaction;
console.log(`Received interaction with commandName=${commandName}`)
commands.find((c) => c.data.name === commandName).execute({ interaction, boss })
const cmd = commands.find((c) => c.data.name === commandName)
if (!cmd) {
console.log(`no command handler matches commandName=${commandName}`)
return;
}
cmd.execute({ interaction, workerUtils })
});
// When the client is ready, run this code (only once).
// The distinction between `client: Client<boolean>` and `readyClient: Client<true>` is important for TypeScript developers.
// It makes some properties non-nullable.
client.once(Events.ClientReady, readyClient => {
console.log(`Ready! Logged in as ${readyClient.user.tag} coño!`);
console.log(`Ready! Logged in as ${readyClient.user.tag}`);
// client.channels.cache.get(process.env.DISCORD_CHANNEL_ID).send('testing 123');
// readyClient.channels.fetch(channelId).then(channel => {
// channel.send('generic welcome message!')
@ -98,11 +107,17 @@ async function setup(commands: any[]) {
}
async function main() {
console.log(`main()`)
const commands = await loadCommands()
if (!commands) throw new Error('there were no commands available to be loaded.');
await deployCommands(commands.map((c) => c.data.toJSON()))
console.log(`${commands.length} commands deployed: ${commands.map((c) => c.data.name).join(', ')}`)
setup(commands)
const workerUtils = await setupGraphileWorker()
setupDiscordBot(commands, workerUtils)
}
main()
main().catch((e) => {
console.error("error during main() function")
console.error(e)
process.exit(3)
})

View File

@ -13,8 +13,8 @@ export default async function loadCommands(): Promise<any[]> {
for (const folder of commandFolders) {
const commandsPath = path.join(foldersPath, folder);
const commandFiles = fs.readdirSync(commandsPath).filter(file => file.endsWith('.ts'));
// console.log(`commandFiles=${commandFiles}`)
const commandFiles = fs.readdirSync(commandsPath).filter(file => file.endsWith('.ts') || file.endsWith('.js'));
console.log(`commandFiles=${commandFiles}`)
// console.log(`Grab the SlashCommandBuilder#toJSON() output of each command's data for deployment`)
for (const file of commandFiles) {
const filePath = path.join(commandsPath, file);

View File

@ -0,0 +1,106 @@
import 'dotenv/config'
import { type Task, type WorkerUtils } from 'graphile-worker';
import { type ChatInputCommandInteraction, Client, Events, GatewayIntentBits, Partials, Message, TextChannel } from 'discord.js'
// export interface DiscordMessageUpdateJob extends Job {
// data: {
// captureJobId: string;
// }
// }
interface Payload {
discord_message_id: string;
capture_job_id: string;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.discord_message_id !== "string") throw new Error("invalid discord_message_id");
if (typeof payload.capture_job_id.to !== "string") throw new Error("invalid capture_job_id");
}
if (!process.env.AUTOMATION_USER_JWT) throw new Error(`AUTOMATION_USER_JWT was missing from env`);
if (!process.env.DISCORD_TOKEN) throw new Error("DISCORD_TOKEN was missing from env");
if (!process.env.DISCORD_CHANNEL_ID) throw new Error("DISCORD_CHANNEL_ID was missing from env");
if (!process.env.DISCORD_GUILD_ID) throw new Error("DISCORD_GUILD_ID was missing from env");
/**
* discordMessageUpdate is the task where we edit a previously sent discord message to display new status information sent to us from @futureporn/capture
*
* Sometimes the update is changing the state, one of Pending|Recording|Aborted|Ended.
* Sometimes the update is updating the Filesize of the recording in-progress
* Sometimes the update is adding a thumbnail image to the message
*/
export default async function discordMessageUpdate <Task> (payload: Payload) {
assertPayload(payload)
// const { captureJobId } = job.data
console.log(`discordMessageUpdate job has begun with captureJobId=${payload.capture_job_id}`)
// // find the discord_interactions record containing the captureJobId
// const res = await fetch(`http://postgrest.futureporn.svc.cluster.local:9000/discord_interactions?capture_job_id=eq.${captureJobId}`)
// if (!res.ok) throw new Error('failed to fetch the discord_interactions');
// const body = await res.json() as DiscordInteraction
// console.log('discord_interactions as follows')
// console.log(body)
// // create a discord.js client
// const client = new Client({
// intents: [
// GatewayIntentBits.Guilds,
// GatewayIntentBits.GuildMessages
// ],
// partials: [
// Partials.Message,
// Partials.Channel,
// ]
// });
// // const messageManager = client.
// // const guild = client.guilds.cache.get(process.env.DISCORD_GUILD_ID!);
// // if (!guild) throw new Error('guild was undefined')
// const channel = await client.channels.fetch(process.env.DISCORD_CHANNEL_ID!) as TextChannel
// if (!channel) throw new Error(`discord channel was undefined`);
// // console.log('we got the following channel')
// // console.log(channel)
// const message = await channel.messages.fetch(body.discord_message_id)
// console.log('we got the following message')
// console.log(message)
// get the message
// client.channel.messages.get()
// client.rest.
// const message: Message = {
// id: body.discord_message_id
// };
// await message.fetchReference()
// message.edit('test message update payload thingy')
// client.rest.updateMessage(message, "My new content");
// TextChannel.fetchMessage(msgId).then(console.log);
// TextChannel
// channel.messages.fetch(`Your Message ID`).then(message => {
// message.edit("New message Text");
// }).catch(err => {
// console.error(err);
// });
// using the discord_interaction's discord_message_id, use discord.js to update the discord message.
}

View File

@ -45,12 +45,26 @@ describe('Record', function () {
s3ClientMock.on(CreateMultipartUploadCommand).resolves({UploadId: '1'});
s3ClientMock.on(UploadPartCommand).resolves({ETag: '1'});
const s3Client = new S3Client({ region: 'us-west-000' })
const record = new Record({ inputStream, s3Client, channel: 'coolguy_69', bucket: 'test' })
const jobId = 'test-job-1234'
const record = new Record({ inputStream, s3Client, bucket: 'test', jobId })
await record.start()
expect(record).to.have.property('counter', 192627)
expect(record).to.have.property('bucket', 'test')
})
it('should be abortable', async function () {
const inputStream = createReadStream(join(__dirname, './fixtures/mock-stream0.mp4')) // 192627 bytes
const s3ClientMock = mockClient(S3Client)
const s3Client = new S3Client({ region: 'us-west-000' })
s3ClientMock.on(CreateMultipartUploadCommand).resolves({UploadId: '1'});
s3ClientMock.on(UploadPartCommand).resolves({ETag: '1'});
const jobId = 'test-job-3456'
const record = new Record({ inputStream, s3Client, jobId, bucket: 'test' })
await record.start()
expect(record).to.have.property('abortController')
await record.abort()
})
xit('should restart if a EPIPE is encountered', async function () {
// @todo IDK how to implement this.
const inputStream = createReadStream(join(__dirname, './fixtures/mock-stream0.mp4'))

View File

@ -1,10 +1,9 @@
import { spawn } from 'child_process';
import { PassThrough, pipeline, Readable, Writable } from 'stream';
import { PassThrough, pipeline, Readable } from 'stream';
import prettyBytes from 'pretty-bytes';
import { Upload } from "@aws-sdk/lib-storage";
import { S3Client } from "@aws-sdk/client-s3";
import 'dotenv/config'
import { createWriteStream } from 'fs';
const ua0 = 'Mozilla/5.0 (X11; Linux x86_64; rv:105.0) Gecko/20100101 Firefox/105.0'
@ -128,9 +127,9 @@ export default class Record {
}
});
console.log('awaiting parallelUploads3.done()...')
console.log('Waiting for parallelUploads3 to finish...')
await parallelUploads3.done();
console.log('parallelUploads3.done() is complete.')
console.log('parallelUploads3 is complete.')
} catch (e) {
if (e instanceof Error) {

View File

@ -2,8 +2,7 @@
import fastify, { type FastifyRequest } from 'fastify'
import { getPackageVersion } from '@futureporn/utils'
import pgbossPlugin, { type ExtendedFastifyInstance } from './fastify-pgboss-plugin.ts'
import PgBoss from 'pg-boss'
import fastifyGraphileWorkerPlugin, { type ExtendedFastifyInstance } from './fastify-graphile-worker-plugin.ts'
import { join, dirname } from 'node:path'
import { fileURLToPath } from 'node:url'
@ -12,28 +11,28 @@ const __dirname = dirname(fileURLToPath(import.meta.url));
const version = getPackageVersion(join(__dirname, '../package.json'))
interface RecordBodyType {
url: string;
channel: string;
discordMessageId: string;
}
const build = function (opts: Record<string, any>={}, boss: PgBoss) {
const build = function (opts: Record<string, any>={}, connectionString: string) {
const app: ExtendedFastifyInstance = fastify(opts)
app.register(pgbossPlugin, { boss })
app.register(fastifyGraphileWorkerPlugin, { connectionString })
app.get('/', async function (request, reply) {
return { app: '@futureporn/capture', version }
})
app.post('/api/record', async function (request: FastifyRequest<{ Body: RecordBodyType }>, reply) {
const { url, channel } = request.body
const { url, discordMessageId } = request.body
console.log(`POST /api/record with url=${url}`)
if (app?.boss) {
const jobId = await app.boss.send('record', {
if (app?.graphile) {
const jobId = await app.graphile.addJob('startRecording', {
url,
channel
discordMessageId
})
return { jobId }
} else {
console.error(`app.boss was missing! Is the pgboss plugin registered to the fastify instance?`)
console.error(`app.graphile was missing! Is the graphile worker plugin registered to the fastify instance?`)
}
return { 'idk': true }
})

View File

@ -4,21 +4,33 @@
import { build } from './app.ts'
import 'dotenv/config'
import PgBoss, { Job } from 'pg-boss'
import { dirname } from 'node:path';
import { makeWorkerUtils, type WorkerUtils, Runner, RunnerOptions, run as graphileRun } from 'graphile-worker'
import { join, dirname } from 'node:path';
import { fileURLToPath } from 'url';
import record, { type RecordJob } from './tasks/record.ts'
import { getPackageVersion } from '@futureporn/utils';
import type { GraphileConfig } from "graphile-config";
import type {} from "graphile-worker";
import startRecording from './tasks/startRecording.ts';
import { stopRecording } from './tasks/stopRecording.ts';
import record from './tasks/record.ts'
const __dirname = dirname(fileURLToPath(import.meta.url));
const version = getPackageVersion(join(__dirname, '../package.json'))
if (!process.env.PGBOSS_URL) throw new Error('PGBOSS_URL is missing in env');
if (!process.env.FUNCTION) throw new Error(`FUNCTION env var was missing. FUNCTION env var must be either 'api' or 'worker'.`);
const connectionString = process.env.PGBOSS_URL!
if (!process.env.WORKER_CONNECTION_STRING) throw new Error(`WORKER_CONNECTION_STRING env var was missing`);
const connectionString = process.env.WORKER_CONNECTION_STRING!
const concurrency = (process.env?.WORKER_CONCURRENCY) ? parseInt(process.env.WORKER_CONCURRENCY) : 1
const preset: GraphileConfig.Preset = {
worker: {
connectionString: process.env.WORKER_CONNECTION_STRING,
concurrentJobs: concurrency,
fileExtensions: [".js", ".ts"],
},
};
async function api(boss: PgBoss) {
async function api() {
if (!process.env.PORT) throw new Error('PORT is missing in env');
console.log(`api FUNCTION listening on PORT ${process.env.PORT}`)
const PORT = parseInt(process.env.PORT!)
@ -32,7 +44,7 @@ async function api(boss: PgBoss) {
}
}
const server = build(fastifyOpts, boss)
const server = build(fastifyOpts, connectionString)
server.listen({ port: PORT }, (err) => {
if (err) {
@ -42,42 +54,34 @@ async function api(boss: PgBoss) {
})
}
async function worker(boss: PgBoss) {
const queue = 'record'
const batchSize = 20
const options = {
teamSize: 1,
teamConcurrency: concurrency,
batchSize
async function worker(workerUtils: WorkerUtils) {
const runnerOptions: RunnerOptions = {
preset,
concurrency,
// taskDirectory: join(__dirname, 'tasks'),
taskList: {
'record': record,
'startRecording': startRecording,
'stopRecording': stopRecording
}
}
await boss.work(queue, options, (job: RecordJob[]) => record(job))
const runner = await graphileRun(runnerOptions)
if (!runner) throw new Error('failed to initialize graphile worker');
await runner.promise
}
async function main() {
const boss = new PgBoss({
connectionString
})
boss.on('error', (err: any) => console.error(err))
boss.on('wip', (wip: any) => {
console.log('wip event was received.')
console.log(wip)
})
boss.on('stopped', () => {
console.log('stopped event was received.')
})
boss.on('monitor-states', (data: any) => {
console.log('monitor-states event was received.')
console.log(data)
})
await boss.start()
const workerUtils = await makeWorkerUtils({ connectionString })
await workerUtils.migrate()
console.log(`@futureporn/capture version ${version} (FUNCTION=${process.env.FUNCTION})`)
if (process.env.FUNCTION === 'api') {
api(boss)
api()
} else if (process.env.FUNCTION === 'worker') {
worker(boss)
worker(workerUtils)
} else {
throw new Error('process.env.FUNCTION must be either api or worker. got '+process.env.FUNCTION)
}

View File

@ -1,76 +1,99 @@
import { Helpers, type Task } from 'graphile-worker'
import Record from '../Record.ts'
import { getPlaylistUrl } from '@futureporn/scout/ytdlp.ts'
import 'dotenv/config'
import { type Job } from 'pg-boss'
import { backOff } from "exponential-backoff"
export interface RecordJob extends Job {
data: {
url: string;
}
/**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
* recordId is the ID of the record record in postgres
* we use the ID to poll the db to see if the job is aborted by the user
*/
interface Payload {
url: string,
recordId: number
}
async function _record (job: RecordJob, retries?: number): Promise<string> {
interface RecordingRecord {
id: number;
isAborted: boolean;
}
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME was undefined in env');
if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was undefined in env');
if (!process.env.S3_REGION) throw new Error('S3_REGION was undefined in env');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was undefined in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was undefined in env');
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.url !== "string") throw new Error("invalid url");
if (typeof payload.recordId !== "number") throw new Error("invalid recordId");
}
if (!job) throw new Error('Job sent to job worker execution callback was empty!!!');
const { url } = job.data;
console.log(`'record' job ${job!.id} begin with url=${url}`)
function assertEnv() {
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was missing in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was missing in env');
if (!process.env.S3_REGION) throw new Error('S3_REGION was missing in env');
if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was missing in env');
if (!process.env.S3_BUCKET) throw new Error('S3_BUCKET was missing in env');
if (!process.env.POSTGREST_URL) throw new Error('POSTGREST_URL was missing in env');
}
const bucket = process.env.S3_BUCKET_NAME!
const endpoint = process.env.S3_ENDPOINT!
const region = process.env.S3_REGION!
const accessKeyId = process.env.S3_ACCESS_KEY_ID!
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
let playlistUrl
try {
playlistUrl = await getPlaylistUrl(url)
console.log(`playlistUrl=${playlistUrl}`)
} catch (e) {
console.error('error during getPlaylistUrl()')
console.error(e)
throw e
}
const jobId = job.id
async function getRecording(url: string, recordId: number, abortSignal: AbortSignal) {
const accessKeyId = process.env.S3_ACCESS_KEY_ID!;
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!;
const region = process.env.S3_REGION!;
const endpoint = process.env.S3_ENDPOINT!;
const bucket = process.env.S3_BUCKET!;
const playlistUrl = await getPlaylistUrl(url)
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const record = new Record({ inputStream, bucket, s3Client, jobId })
await record.start()
console.log(`record job ${job.id} complete`)
return job.id
const record = new Record({ inputStream, bucket, s3Client, jobId: ''+recordId, abortSignal })
record.start()
return record
}
export default async function main (jobs: RecordJob[]): Promise<any> {
// @todo why are we passed multiple jobs? I'm expecting only one.
const backOffOptions = {
numOfAttempts: 5,
startingDelay: 5000,
retry: (e: any, attemptNumber: number) => {
console.log(`Record Job is retrying. Attempt number ${attemptNumber}. e=${JSON.stringify(e, null, 2)}`)
return true
async function checkIfAborted(recordId: number): Promise<boolean> {
const res = await fetch(`${process.env.POSTGREST_URL}/records?id.eq=${recordId}`, {
headers: {
'Content-Type': 'application/json',
'Accepts': 'application/json'
}
})
if (!res.ok) {
throw new Error(`failed to checkIfAborted. status=${res.status}, statusText=${res.statusText}`);
}
for (const j of jobs) {
console.log(`record job ${j.id} GO GO GO`)
try {
await backOff(() => _record(j), backOffOptions)
} catch (e) {
console.warn(`record job ${j.id} encountered the following error.`)
console.error(e)
}
console.log(`record job ${j.id} is finished.`)
const body = await res.json() as RecordingRecord[];
if (!body[0]) throw new Error(`failed to get a record that matched recordId=${recordId}`)
return body[0].isAborted
}
export const record: Task = async function (payload, helpers) {
assertPayload(payload)
assertEnv()
const { url, recordId } = payload
const abortController = new AbortController()
let interval
try {
const record = await getRecording(url, recordId, abortController.signal)
// every 30s, poll db to see if our job has been aborted by the user
interval = setInterval(async () => {
const isAborted = await checkIfAborted(recordId)
if (isAborted) {
abortController.abort()
}
}, 30000)
} finally {
clearInterval(interval)
}
};
// const recordId = await createRecordingRecord(payload, helpers)
// const { url } = payload;
// console.log(`@todo simulated start_recording with url=${url}, recordId=${recordId}`)
// await helpers.addJob('record', { url, recordId })
}
// // export default record
// export default function (payload: Payload, helpers: Helpers) {
// helpers.logger.info('WHEEEEEEEEEEEEEEEE (record.ts task executor)')
// }
export default record

View File

@ -0,0 +1,77 @@
import Record from '../Record.ts'
import { getPlaylistUrl } from '@futureporn/scout/ytdlp.ts'
import 'dotenv/config'
import { type Job } from 'pg-boss'
import { backOff } from 'exponential-backoff'
export interface RecordJob extends Job {
data: {
url: string;
}
}
async function _record (job: RecordJob, retries?: number): Promise<string> {
if (!process.env.S3_BUCKET_NAME) throw new Error('S3_BUCKET_NAME was undefined in env');
if (!process.env.S3_ENDPOINT) throw new Error('S3_ENDPOINT was undefined in env');
if (!process.env.S3_REGION) throw new Error('S3_REGION was undefined in env');
if (!process.env.S3_ACCESS_KEY_ID) throw new Error('S3_ACCESS_KEY_ID was undefined in env');
if (!process.env.S3_SECRET_ACCESS_KEY) throw new Error('S3_SECRET_ACCESS_KEY was undefined in env');
if (!job) throw new Error('Job sent to job worker execution callback was empty!!!');
const { url } = job.data;
console.log(`'record' job ${job!.id} begin with url=${url}`)
const bucket = process.env.S3_BUCKET_NAME!
const endpoint = process.env.S3_ENDPOINT!
const region = process.env.S3_REGION!
const accessKeyId = process.env.S3_ACCESS_KEY_ID!
const secretAccessKey = process.env.S3_SECRET_ACCESS_KEY!
let playlistUrl
try {
playlistUrl = await getPlaylistUrl(url)
console.log(`playlistUrl=${playlistUrl}`)
} catch (e) {
console.error('error during getPlaylistUrl()')
console.error(e)
throw e
}
const jobId = job.id
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const record = new Record({ inputStream, bucket, s3Client, jobId })
await record.start()
console.log(`record job ${job.id} complete`)
return job.id
}
export default async function main (jobs: RecordJob[]): Promise<any> {
// @todo why are we passed multiple jobs? I'm expecting only one.
const backOffOptions = {
numOfAttempts: 5,
startingDelay: 5000,
retry: (e: any, attemptNumber: number) => {
console.log(`Record Job is retrying. Attempt number ${attemptNumber}. e=${JSON.stringify(e, null, 2)}`)
return true
}
}
for (const j of jobs) {
console.log(`record job ${j.id} GO GO GO`)
try {
await backOff(() => _record(j), backOffOptions)
} catch (e) {
console.warn(`record job ${j.id} encountered the following error.`)
console.error(e)
}
console.log(`record job ${j.id} is finished.`)
}
};

View File

@ -0,0 +1,60 @@
import { Helpers, type Task } from 'graphile-worker'
/**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody
* discordMessageId is the ID of the discord messate which displays recording status.
* we use the ID to update the message later, and/or relate button press events to this record task
*/
interface Payload {
url: string;
discordMessageId: string;
isAborted: boolean;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.url !== "string") throw new Error("invalid url");
if (typeof payload.discordMessageId !== "string") throw new Error("invalid discordMessageId");
}
function assertEnv() {
if (!process.env.AUTOMATION_USER_JWT) throw new Error('AUTOMATION_USER_JWT was missing in env');
}
async function createRecordingRecord(payload: Payload, helpers: Helpers): Promise<number> {
const { url, discordMessageId } = payload
const record = {
url,
discordMessageId,
isAborted: false
}
const res = await fetch('http://postgrest.futureporn.svc.cluster.local:9000/records', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${process.env.AUTOMATION_USER_JWT}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(record)
})
if (!res.ok) {
const status = res.status
const statusText = res.statusText
throw new Error(`fetch failed to create recording record in database. status=${status}, statusText=${statusText}`)
}
helpers.logger.info('res.headers as follows.')
helpers.logger.info(res.headers)
return res.headers.Location.split('.').at(-1)
}
export const startRecording: Task = async function (payload, helpers) {
assertPayload(payload)
assertEnv()
const recordId = await createRecordingRecord(payload, helpers)
const { url } = payload;
console.log(`@todo simulated start_recording with url=${url}, recordId=${recordId}`)
await helpers.addJob('record', { url, recordId })
}
export default startRecording

View File

@ -0,0 +1,18 @@
import { type Task } from 'graphile-worker'
interface Payload {
id: string
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.id !== "string") throw new Error("invalid id");
}
export const stopRecording: Task = async function (payload) {
assertPayload(payload)
const { id } = payload;
console.log(`@todo simulated stop_recording with id=${id}`)
}

View File

@ -2,20 +2,26 @@
-- example: api.discord_interactions becomes accessible at localhost:9000/discord_interactions
CREATE schema api;
-- schema for @futureporn/capture and @futureporn/bot
CREATE TABLE api.discord_interactions (
id int PRIMARY KEY,
discord_message_id text NOT NULL,
capture_job_id text NOT NULL
);
-- authenticator is the role which can "impersonate" other users.
CREATE ROLE authenticator LOGIN NOINHERIT NOCREATEDB NOCREATEROLE NOSUPERUSER;
-- anonymous is the role assigned to anonymous web requests
CREATE ROLE anonymous NOLOGIN;
-- roles & users for our @futureporn/capture user
CREATE ROLE capture_user NOLOGIN;
GRANT capture_user TO authenticator;
GRANT usage ON SCHEMA api TO capture_user;
GRANT ALL ON api.discord_interactions TO capture_user;
-- schema for @futureporn/capture and @futureporn/bot
CREATE TABLE api.discord_interactions (
id int PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
discord_message_id text NOT NULL,
capture_job_id text NOT NULL
);
-- roles & permissions for our backend automation user
CREATE ROLE automation_user NOLOGIN;
GRANT automation_user TO authenticator;
GRANT usage ON SCHEMA api TO automation_user;
GRANT all ON api.discord_interactions TO automation_user;
-- role & permissions for anonymous web user
CREATE ROLE anonymous_user NOLOGIN;
GRANT usage on schema api TO anonymous_user;
GRANT SELECT ON api.discord_interactions TO anonymous_user;

View File

@ -0,0 +1,10 @@
-- schema for @futureporn/capture and @futureporn/bot
CREATE TABLE api.records (
id INT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
url TEXT NOT NULL,
discordMessageId TEXT NOT NULL,
isAborted BOOLEAN DEFAULT FALSE
);
-- roles & permissions for our backend automation user
GRANT all ON api.records TO automation_user;

View File

@ -1,7 +1,7 @@
{
"name": "@futureporn/migrations",
"type": "module",
"version": "0.0.1",
"version": "0.0.2",
"description": "",
"main": "index.js",
"scripts": {