implement record retrying
ci / build (push) Failing after 1s Details

This commit is contained in:
CJ_Clippy 2024-08-19 20:27:40 -08:00
parent 9cd9b6a53d
commit f342bf9671
35 changed files with 728 additions and 653 deletions

View File

@ -432,6 +432,13 @@ k8s_resource(
link('https://game-2048.fp.sbtp.xyz/')
]
)
k8s_resource(
workload='whoami',
labels=['frontend'],
links=[
link('https://whoami.fp.sbtp.xyz/')
]
)
k8s_resource(
workload='postgresql-primary',
port_forwards=['5432'],

View File

@ -52,6 +52,11 @@ spec:
secretKeyRef:
name: postgrest
key: automationUserJwt
- name: HTTP_PROXY
valueFrom:
secretKeyRef:
name: capture
key: httpProxy
- name: POSTGREST_URL
value: "{{ .Values.postgrest.url }}"
- name: PORT

View File

@ -6,13 +6,14 @@ declare namespace Futureporn {
type PlatformNotificationType = 'email' | 'manual' | 'twitter'
type ArchiveStatus = 'good' | 'issue' | 'missing'
type RecordingState = 'pending' | 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished'
type Status = Partial<RecordingState>
type RecordingState = 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished'
type Status = Partial<'pending_recording' | RecordingState>
interface Stream {
id: string;
url: string;
platform_notification_type: PlatformNotificationType;
discord_message_id: string;
date: Date;
created_at: Date;
updated_at: Date;
@ -23,6 +24,7 @@ declare namespace Futureporn {
is_fansly_stream: Boolean;
is_recording_aborted: Boolean;
status: Status;
segments?: Segment[]
}
interface RecordingRecord {
@ -41,6 +43,8 @@ declare namespace Futureporn {
s3_id: string;
bytes: number;
stream?: Stream[];
created_at: Date;
updated_at: Date;
}

View File

@ -71,7 +71,8 @@ kubectl --namespace futureporn delete secret capture --ignore-not-found
kubectl --namespace futureporn create secret generic capture \
--from-literal=workerConnectionString=${WORKER_CONNECTION_STRING} \
--from-literal=s3AccessKeyId=${S3_USC_BUCKET_KEY_ID} \
--from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY}
--from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY} \
--from-literal=httpProxy=${HTTP_PROXY}
kubectl --namespace futureporn delete secret mailbox --ignore-not-found
kubectl --namespace futureporn create secret generic mailbox \

View File

@ -15,4 +15,4 @@
## every n minutes, we see which /records are stale and we mark them as such.
## this prevents stalled Record updates by marking stalled recordings as stopped
* * * * * expire_stream_recordings ?max=1 { idle_minutes:2 }
* * * * * update_stream_statuses ?max=1 { stalled_minutes:1 }

View File

@ -36,6 +36,7 @@ bot.transformers.desiredProperties.interaction.token = true
bot.transformers.desiredProperties.interaction.guildId = true
bot.transformers.desiredProperties.interaction.member = true
bot.transformers.desiredProperties.interaction.message = true
bot.transformers.desiredProperties.interaction.user = true
bot.transformers.desiredProperties.message.activity = true
bot.transformers.desiredProperties.message.id = true

View File

@ -0,0 +1,14 @@
import { EventEmitter } from 'node:events'
import type { Interaction } from '@discordeno/bot'
export class ItemCollector extends EventEmitter {
onItem(callback: (item: Interaction) => unknown): void {
this.on('item', callback)
}
collect(item: Interaction): void {
this.emit('item', item)
}
}
export default ItemCollector

View File

@ -0,0 +1,51 @@
import { ApplicationCommandTypes, type Interaction } from '@discordeno/bot'
import type { Status } from '@futureporn/types'
import { createCommand } from '../commands.ts'
import { bot } from '../bot.ts'
import { configs } from '../config.ts'
createCommand({
name: 'cancel',
description: 'Cancel a recording',
type: ApplicationCommandTypes.ChatInput,
async execute(interaction: Interaction) {
bot.logger.info(`cancel command is executing now.`)
const message = interaction.message
if (!message) return bot.logger.error('interaction.message was missing');
if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
const url = `${configs.postgrestUrl}/streams?discord_message_id=eq.${message.id}`;
const options = {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation',
'Authorization': `Bearer ${configs.automationUserJwt}`
},
body: JSON.stringify({
is_recording_aborted: true,
status: 'aborted' as Status
})
};
let streamId: string;
try {
const response = await fetch(url, options);
bot.logger.info(`response.ok=${response.ok}`)
const data: any = await response.json();
streamId = data?.at(0).id
bot.logger.info(interaction.user);
interaction.respond(`<@${interaction.user.id}> cancelled recording on Stream ${streamId}`, { isPrivate: false })
bot.logger.info(`Cancel command successfully ran on message.id=${message.id}`)
} catch (error) {
bot.logger.error('error encountered while cancelling job')
bot.logger.error(error);
}
},
})

View File

@ -4,9 +4,11 @@ import {
type Interaction,
EmbedsBuilder,
type InteractionCallbackData,
logger,
} from '@discordeno/bot'
import { createCommand } from '../commands.ts'
import { configs } from '../config.ts'
import type { Stream } from '@futureporn/types'
async function createStreamInDatabase(url: string, discordMessageId: string) {
@ -19,15 +21,16 @@ async function createStreamInDatabase(url: string, discordMessageId: string) {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Prefer': 'return=headers-only',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(streamPayload)
})
if (!res.ok) {
const status = res.status
const statusText = res.statusText
const msg = `fetch failed to create stream in database. status=${status}, statusText=${statusText}`
const body = await res.text()
const msg = `failed to create stream in database. status=${status}, statusText=${statusText}, body=${body}`
console.error(msg)
throw new Error(msg)
}
@ -36,6 +39,47 @@ async function createStreamInDatabase(url: string, discordMessageId: string) {
return parseInt(id)
}
async function getUrlFromMessage(interaction: Interaction): Promise<string|null> {
const messageId = interaction.message?.id
const pgRequestUrl = `${configs.postgrestUrl}/streams?discord_message_id=eq.${messageId}`
logger.info(`pgRequestUrl=${pgRequestUrl}`)
const requestOptions = {
method: 'GET',
headers: {
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Content-Type': 'application/json',
'Prefer': 'return=representation'
}
}
try {
const res = await fetch (pgRequestUrl, requestOptions)
if (!res.ok) {
const body = await res.json()
logger.error(body)
throw new Error(`Problem during getOptionsMessage. res.status=${res.status}, res.statusText=${res.statusText}`)
}
const json = await res.json() as Stream[]
const stream = json[0]
const url = stream?.url
if (!url) return null
else return url
} catch (e) {
logger.error(e)
throw e
}
}
async function getUrlFromData(interaction: Interaction): Promise<string|null> {
if (!interaction) throw new Error('interaction arg passed to getOptions was missing');
const url = (interaction.data?.options?.find(o => o.name === 'url'))?.value
if (!url) return null;
return String(url)
}
createCommand({
name: 'record',
description: 'Record a livestream.',
@ -48,43 +92,50 @@ createCommand({
},
],
async execute(interaction: Interaction) {
logger.info('logger.info hello? record command is running now`)')
await interaction.defer()
// console.log('interation.data as follows')
// console.log(interaction.data)
const options = interaction.data?.options
if (!options) throw new Error(`interaction options was undefined. it's expected to be an array of options.`);
const urlOption = options.find((o) => o.name === 'url')
if (!urlOption) throw new Error(`url option was missing from interaction data`);
const url = ''+urlOption.value
if (!url) throw new Error(`url was missing from interaction data options`);
try {
// The url can come from one of two places.
// interaction.data.options, or interaction.message?.embeds
let url
url = await getUrlFromData(interaction)
logger.info(`getUrlFromData url=${url}`)
if (!url) {
url = await getUrlFromMessage(interaction)
logger.info(`getUrlFromMessage url=${url}`)
}
logger.info(`url=${url}`)
if (!url) throw new Error('Neither the interaction data nor the message embed contained a URL.');
// respond to the interaction and get a message ID which we will then add to the database Record
const embeds = new EmbedsBuilder()
.setTitle(`Record ⋅`)
.setDescription('Waiting for a worker to start the job.')
.setFields([
{ name: 'Status', value: 'Pending', inline: true },
{ name: 'Filesize', value: '0 bytes', inline: true},
{ name: 'URL', value: url, inline: false }
])
.setColor('#808080')
const response: InteractionCallbackData = { embeds }
const message = await interaction.edit(response)
// respond to the interaction and get a message ID which we will then add to the database Record
const embeds = new EmbedsBuilder()
.setTitle(`Stream ⋅`)
.setDescription('Waiting for a worker to start the job.')
.setFields([
{ name: 'Status', value: 'Pending', inline: true },
{ name: 'URL', value: url, inline: true }
])
.setColor('#808080')
const response: InteractionCallbackData = { embeds }
const message = await interaction.edit(response)
// console.log('deferred, interaction message is as follows')
// console.log(message)
if (!message?.id) {
const msg = `message.id was empty, ruh roh raggy`
console.error(msg)
throw new Error(msg)
if (!message?.id) {
const msg = `message.id was empty, ruh roh raggy`
console.error(msg)
throw new Error(msg)
}
// @todo create stream in db
const stream = await createStreamInDatabase(url, message.id.toString())
logger.info(stream)
} catch (e) {
await interaction.edit(`Record failed due to the following error.\n${e}`)
}
// @todo create record in db
const record = await createStreamInDatabase(url, message.id.toString())
// console.log(record)
}
})

View File

@ -0,0 +1,20 @@
import { ApplicationCommandTypes, type Interaction } from '@discordeno/bot'
import { createCommand } from '../commands.ts'
import { bot } from '../bot.ts'
createCommand({
name: 'yeah',
description: 'Yeah! a message',
type: ApplicationCommandTypes.ChatInput,
async execute(interaction: Interaction) {
// interaction.message.id
const message = interaction.message
if (!message) return bot.logger.error('interaction.message was missing');
if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
interaction.respond('https://futureporn-b2.b-cdn.net/yeah_nobg.png', { isPrivate: true })
bot.logger.info(`Yeah! command successfully ran with message.id=${message.id}`)
},
})

View File

@ -0,0 +1 @@
Handlers for Message Component interactions such as button presses

View File

@ -1,22 +1,50 @@
import { InteractionTypes, commandOptionsParser, type Interaction } from '@discordeno/bot'
import { bot } from '../bot.ts'
import { commands } from '../commands.ts'
import { commands, type Command } from '../commands.ts'
import ItemCollector from '../collector.ts'
bot.events.interactionCreate = async (interaction: Interaction) => {
if (!interaction.data || interaction.type !== InteractionTypes.ApplicationCommand) return
const command = commands.get(interaction.data.name)
if (!command) {
bot.logger.error(`Command ${interaction.data.name} not found`)
return
}
export const collectors = new Set<ItemCollector>()
const execCommand = async function execCommand(command: Command, interaction: Interaction) {
const options = commandOptionsParser(interaction)
try {
await command.execute(interaction, options)
} catch (error) {
bot.logger.error(`There was an error running the ${command.name} command.`, error)
}
}
const handleApplicationCommand = async function handleApplicationCommand (interaction: Interaction) {
if (!interaction.data) return
const command = commands.get(interaction.data.name)
if (!command) {
bot.logger.error(`Command ${interaction.data.name} (customId=${interaction.data.customId}) not found`)
return
}
execCommand(command, interaction)
}
const handleMessageComponent = async function handleMessageComponent (interaction: Interaction) {
if (!interaction.data) return
if (!interaction.data.customId) return
const command = commands.get(interaction.data.customId)
if (!command) return bot.logger.error(`Command ${interaction.data.customId} not found`);
execCommand(command, interaction)
}
bot.events.interactionCreate = async (interaction: Interaction) => {
if (interaction.type === InteractionTypes.ApplicationCommand) {
await handleApplicationCommand(interaction)
} else if (interaction.type === InteractionTypes.MessageComponent) {
await handleMessageComponent(interaction)
} else {
bot.logger.info(`received interaction of type=${interaction.type}`)
}
}

View File

@ -1,9 +0,0 @@
import { Client, Events, type Interaction } from 'discord.js';
export default {
name: Events.ClientReady,
once: true,
execute(client: Client) {
console.log(`Ready! Logged in as ${client?.user?.tag}`);
}
}

View File

@ -1,18 +0,0 @@
import { type CreateApplicationCommand, type CreateSlashApplicationCommand, type Interaction } from '@discordeno/bot'
import record from '../commands/record.ts'
import donger from '../commands/donger.ts'
export const commands = new Map<string, Command>(
[
record,
donger
].map(cmd => [cmd.name, cmd]),
)
export default commands
export interface Command extends CreateSlashApplicationCommand {
/** Handler that will be executed when this command is triggered */
execute(interaction: Interaction, args: Record<string, any>): Promise<any>
}

View File

@ -1,25 +0,0 @@
import 'dotenv/config';
import { REST, Routes } from 'discord.js';
if (!process.env.DISCORD_APPLICATION_ID) throw new Error('DISCORD_APPLICATION_ID was undefined in env');
if (!process.env.DISCORD_GUILD_ID) throw new Error('DISCORD_GUILD_ID was undefined in env');
if (!process.env.DISCORD_TOKEN) throw new Error('DISCORD_TOKEN was undefined in env');
// Construct and prepare an instance of the REST module
const rest = new REST({ version: '9' }).setToken(process.env.DISCORD_TOKEN);
export default async function deployCommands(commands: any[]): Promise<void> {
try {
// console.log(`Started refreshing ${commands.length} application (/) commands.`);
// and deploy your commands!
const data: any = await rest.put(
Routes.applicationGuildCommands(process.env.DISCORD_APPLICATION_ID!, process.env.DISCORD_GUILD_ID!),
{ body: commands },
);
// console.log(`Successfully reloaded ${data.length} application (/) commands.`);
} catch (error) {
// And of course, make sure you catch and log any errors!
console.error(error);
}
}

View File

@ -1,8 +0,0 @@
import type { EventHandlers } from '@discordeno/bot'
import { event as interactionCreateEvent } from './interactionCreate.ts.old'
export const events = {
interactionCreate: interactionCreateEvent,
} as Partial<EventHandlers>
export default events

View File

@ -1,71 +0,0 @@
import { Events, type Interaction, Client, Collection } from 'discord.js';
import type { WorkerUtils } from 'graphile-worker';
interface ExtendedClient extends Client {
commands: Collection<string, any>
}
export default {
name: Events.InteractionCreate,
once: false,
async execute(interaction: Interaction, workerUtils: WorkerUtils) {
// if (!interaction.isChatInputCommand()) return;
// console.log(interaction.client)
// const command = interaction.client.commands.get(interaction.commandName);
if (interaction.isButton()) {
console.log(`the interaction is a button type with customId=${interaction.customId}, message.id=${interaction.message.id}, user=${interaction.user.id} (${interaction.user.globalName})`)
if (interaction.customId === 'stop') {
interaction.reply(`Stopped by @${interaction.user.id}`)
workerUtils.addJob('stop_recording', { discordMessageId: interaction.message.id, userId: interaction.user.id }, { maxAttempts: 1 })
} else if (interaction.customId === 'retry') {
interaction.reply(`Retried by @${interaction.user.id}`)
workerUtils.addJob('start_recording', { discordMessageId: interaction.message.id, userId: interaction.user.id }, { maxAttempts: 3 })
} else {
console.error(`this button's customId=${interaction.customId} did not match one of the known customIds`)
}
} else if (interaction.isChatInputCommand()) {
console.log(`the interaction is a ChatInputCommandInteraction with commandName=${interaction.commandName}, user=${interaction.user.id} (${interaction.user.globalName})`)
const client = interaction.client as ExtendedClient
const command = client.commands.get(interaction.commandName);
if (!command) {
console.error(`No command matching ${interaction.commandName} was found.`);
return;
}
command.execute({ interaction, workerUtils })
}
},
};
// const { Events } = require('discord.js');
// module.exports = {
// name: Events.ClientReady,
// once: true,
// execute(client) {
// console.log(`Ready! Logged in as ${client.user.tag}`);
// },
// };
// client.on(Events.InteractionCreate, interaction => {
// if (interaction.isChatInputCommand()) {
// const { commandName } = interaction;
// console.log(`Received interaction with commandName=${commandName}`)
// const cmd = commands.find((c) => c.data.name === commandName)
// if (!cmd) {
// console.log(`no command handler matches commandName=${commandName}`)
// return;
// }
// cmd.execute({ interaction, workerUtils })
// } else {
// // probably a ButtonInteraction
// console.log(interaction)
// }
// });

View File

@ -1,31 +0,0 @@
import * as path from 'node:path';
import * as fs from 'node:fs';
import { dirname } from 'node:path';
import { fileURLToPath } from 'url';
const __dirname = dirname(fileURLToPath(import.meta.url));
export default async function loadCommands(): Promise<any[]> {
const commands: any[] = [];
// console.log('Grab all the command folders from the commands directory you created earlier')
const foldersPath = path.join(__dirname, 'commands');
const commandFolders = fs.readdirSync(foldersPath);
for (const folder of commandFolders) {
const commandsPath = path.join(foldersPath, folder);
const commandFiles = fs.readdirSync(commandsPath).filter(file => file.endsWith('.ts') || file.endsWith('.js'));
console.log(`commandFiles=${commandFiles}`);
// console.log(`Grab the SlashCommandBuilder#toJSON() output of each command's data for deployment`)
for (const file of commandFiles) {
const filePath = path.join(commandsPath, file);
const command = (await import(filePath)).default;
// console.log(command)
if (command?.data && command?.execute) {
commands.push(command);
} else {
console.log(`[WARNING] The command at ${filePath} is missing a required "data" or "execute" property.`);
}
}
}
return commands;
}

View File

@ -1,23 +0,0 @@
import * as path from 'node:path';
import * as fs from 'node:fs';
import { dirname } from 'node:path';
import { fileURLToPath } from 'url';
import type { Client } from 'discord.js';
import type { WorkerUtils } from 'graphile-worker';
const __dirname = dirname(fileURLToPath(import.meta.url));
export default async function loadEvents(client: Client, workerUtils: WorkerUtils) {
console.log(`loading events`);
const eventsPath = path.join(__dirname, 'events');
const eventFiles = fs.readdirSync(eventsPath).filter(file => file.endsWith('.ts') || file.endsWith('.js'));
console.log(`eventFiles=${eventFiles}`);
for (const file of eventFiles) {
const filePath = path.join(eventsPath, file);
const event = (await import(filePath)).default;
if (event.once) {
client.once(event.name, (...args) => event.execute(...args, workerUtils));
} else {
client.on(event.name, (...args) => event.execute(...args, workerUtils));
}
}
}

View File

@ -1,24 +0,0 @@
import { Client, Events, MessageReaction, User, type Interaction } from 'discord.js';
export default {
name: Events.MessageReactionAdd,
once: false,
async execute(reaction: MessageReaction, user: User) {
// When a reaction is received, check if the structure is partial
if (reaction.partial) {
// If the message this reaction belongs to was removed, the fetching might result in an API error which should be handled
try {
await reaction.fetch();
} catch (error) {
console.error('Something went wrong when fetching the message:', error);
// Return as `reaction.message.author` may be undefined/null
return;
}
}
// Now the message has been cached and is fully available
console.log(`${reaction.message.author}'s message "${reaction.message.content}" gained a reaction!`);
// The reaction is now also fully available and the properties will be reflected accurately:
console.log(`${reaction.count} user(s) have given the same reaction to this message!`);
}
}

View File

@ -1,12 +0,0 @@
import 'dotenv/config'
import { bot } from '../index.js'
import donger from '../commands/donger.js'
import record from '../commands/record.js'
const guildId = process.env.DISCORD_GUILD_ID!
const commands = [
donger,
record
]
await bot.rest.upsertGuildApplicationCommands(guildId, commands)

View File

@ -1,45 +0,0 @@
import { type ChatInputCommandInteraction, SlashCommandBuilder, Message } from 'discord.js';
const dongers: string[] = [
'( ͡ᵔ ͜ʖ ͡ᵔ )',
'¯\_(ツ)_/¯',
'(๑>ᴗ<๑)',
'(̿▀̿ ̿Ĺ̯̿̿▀̿ ̿)',
'( ͡° ͜ʖ ͡°)',
'٩(͡๏̯͡๏)۶',
'ლ(´◉❥◉`ლ)',
'( ゚Д゚)',
'ԅ( ͒ ۝ ͒ )ᕤ',
'( ͡ᵔ ͜ʖ ͡°)',
'( ͠° ͟ʖ ͡°)╭∩╮',
'༼ つ ❦౪❦ ༽つ',
'( ͡↑ ͜ʖ ͡↑)',
'(ভ_ ভ) ރ / ┊ \',
'ヽ(⌐□益□)ノ',
'༼ つ ◕‿◕ ༽つ',
'ヽ(⚆෴⚆)ノ',
'(つ .•́ _ʖ •̀.)つ',
'༼⌐■ل͟■༽',
'┬─┬ノ( ͡° ͜ʖ ͡°ノ)',
'༼⁰o⁰༽꒳ᵒ꒳ᵎᵎᵎ',
'( -_・) ▄︻̷̿┻̿═━一',
'【 º ᗜ º 】',
'ᕦ(✧╭╮✧)ᕥ',
'┗( TT )┛',
'(Φ ᆺ Φ)',
'(TдT)',
'☞(◉▽◉)☞'
];
export default {
data: new SlashCommandBuilder()
.setName('donger')
.setDescription('Replies with a free donger!'),
async execute({ interaction }: { interaction: ChatInputCommandInteraction}): Promise<void> {
await interaction.reply({
content: dongers[Math.floor(Math.random()*dongers.length)]
});
},
};

View File

@ -1,144 +0,0 @@
import type { ExecuteArguments } from '../../index.js';
if (!process.env.AUTOMATION_USER_JWT) throw new Error(`AUTOMATION_USER_JWT was missing from env`);
export default {
data: new SlashCommandBuilder()
.setName('record')
.setDescription('Record a livestream.')
.addStringOption((option) =>
option.setName('url')
.setMaxLength(1024)
.setDescription('The channel URL to record')
.setRequired(true)
),
async execute({ interaction, workerUtils }: ExecuteArguments): Promise<void> {
const url = interaction.options.getString('url')
// const row = new ActionRowBuilder<ButtonBuilder>()
// .addComponents(component);
// {
// content: `Button`,
// components: [
// new ActionRowBuilder<MessageActionRowComponentBuilder>().addComponents([
// new ButtonBuilder()
// .setCustomId('click/12345')
// .setLabel('LABEL')
// .setStyle(ButtonStyle.Primary)
// ])
// ]
// cols can be 5 high
// rows can be 5 wide
const statusEmbed = new EmbedBuilder()
.setTitle('Pending')
.setDescription('Waiting for a worker to accept the job.')
.setColor(2326507)
const buttonRow = new ActionRowBuilder<MessageActionRowComponentBuilder>()
.addComponents([
new ButtonBuilder()
.setCustomId('stop')
.setLabel('Stop Recording')
.setEmoji('🛑')
.setStyle(ButtonStyle.Danger),
]);
// const embed = new EmbedBuilder().setTitle('Attachments');
const idk = await interaction.reply({
content: `/record ${url}`,
embeds: [
statusEmbed
],
components: [
buttonRow
]
});
// console.log('the following is idk, the return value from interaction.reply')
// console.log(idk)
const message = await idk.fetch()
const discordMessageId = message.id
await workerUtils.addJob('start_recording', { url, discordMessageId }, { maxAttempts: 3 })
},
};
/**
{
"content": "https://chaturbate.com/projektmelody",
"tts": false,
"embeds": [
{
"id": 652627557,
"title": "Pending",
"description": "Waiting for a worker to accept the job.",
"color": 2326507,
"fields": []
},
{
"id": 88893690,
"title": "Recording",
"description": "The stream is being recorded.",
"color": 392960,
"fields": []
},
{
"id": 118185075,
"title": "Aborted",
"description": "The recording was stopped by the user.",
"color": 8289651,
"fields": []
},
{
"id": 954884517,
"title": "Ended",
"description": "The recording has stopped.",
"color": 10855845,
"fields": []
},
{
"id": 64407340,
"description": "",
"fields": [],
"image": {
"url": "https://futureporn-b2.b-cdn.net/ti8ht9bgwj6k783j7hglfg8j_projektmelody-chaturbate-2024-07-18.png"
}
}
],
"components": [
{
"id": 300630266,
"type": 1,
"components": [
{
"id": 320918638,
"type": 2,
"style": 4,
"label": "Stop Recording",
"action_set_id": "407606538",
"emoji": {
"name": "🛑",
"animated": false
}
}
]
}
],
"actions": {
"407606538": {
"actions": []
}
},
"username": "@futureporn/capture",
"avatar_url": "https://cdn.discordapp.com/avatars/1081818302344597506/93892e06c2f94c3ef1043732a49856db.webp?size=128"
}
*/

View File

@ -1,14 +0,0 @@
import { type ChatInputCommandInteraction, SlashCommandBuilder } from 'discord.js';
export default {
data: new SlashCommandBuilder()
.setName('sim-email')
.setDescription('Simulate an incoming platform notification e-mail'),
async execute(interaction: ChatInputCommandInteraction): Promise<void> {
await interaction.reply({
content: 'testing 123 this is sim-email (simEmail.ts)'
});
},
};

View File

@ -1,72 +0,0 @@
import type { Task, Helpers } from "graphile-worker"
import { sub } from 'date-fns'
import type { RecordingRecord, Stream } from "@futureporn/types"
import qs from 'qs'
import fetch from 'node-fetch'
import { configs } from '../config.ts'
interface Payload {
idle_minutes: number;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (!payload.idle_minutes) throw new Error(`idle_minutes was absent in the payload`);
if (typeof payload.idle_minutes !== 'number') throw new Error(`idle_minutes parameter was not a number`);
}
export const expire_stream_recordings: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { idle_minutes } = payload
helpers.logger.info(`expire_stream_recordings has begun. Expring 'recording' and 'pending' streams that haven't been updated in ${idle_minutes} minutes.`)
const url = 'http://postgrest.futureporn.svc.cluster.local:9000/streams'
let streams: Stream[] = []
try {
// 1. identify and update stalled /streams
// Any streams that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled.
const timestamp = sub(new Date(), { minutes: idle_minutes }).toISOString()
const queryOptions = {
updated_at: `lt.${timestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)'
}
const updatePayload = {
updated_at: new Date().toISOString(),
status: 'stalled'
}
helpers.logger.info(JSON.stringify(updatePayload))
const query = qs.stringify(queryOptions)
const res = await fetch (`${url}?${query}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(updatePayload)
})
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
const body = await res.text()
helpers.logger.info('body as follows')
helpers.logger.info(body)
} catch (e: any) {
if (e instanceof Error) {
helpers.logger.error(`hi there we encountered an error while fetching /streams`)
helpers.logger.error(e.message)
} else {
helpers.logger.error(e)
}
}
}
export default expire_stream_recordings

View File

@ -1,7 +1,7 @@
import 'dotenv/config'
import type { Status } from '@futureporn/types'
import type { Status, Stream, Segment } from '@futureporn/types'
import { type Task, type Helpers } from 'graphile-worker'
import { add } from 'date-fns'
import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns'
import prettyBytes from 'pretty-bytes'
import {
EmbedsBuilder,
@ -9,14 +9,14 @@ import {
type ActionRow,
MessageComponentTypes,
type ButtonComponent,
type InputTextComponent,
type EditMessage,
type Message,
type Embed
} from '@discordeno/bot'
import { bot } from '../bot.ts'
import { configs } from '../config.ts'
const yeahEmojiId = BigInt('1253191939461873756')
interface Payload {
stream_id: number;
}
@ -31,25 +31,17 @@ function assertPayload(payload: any): asserts payload is Payload {
async function editDiscordMessage({ helpers, streamStatus, discordMessageId, url, fileSize, streamId }: { streamId: number, fileSize: number, url: string, helpers: Helpers, streamStatus: Status, discordMessageId: string }) {
async function editDiscordMessage({ helpers, stream }: { stream: Stream, helpers: Helpers }) {
const discordMessageId = stream.discord_message_id
if (!discordMessageId) throw new Error(`discordMessageId was missing!`);
if (typeof discordMessageId !== 'string') throw new Error(`discordMessageId was not a string!`);
// const { captureJobId } = job.data
helpers.logger.info(`editDiscordMessage has begun with discordMessageId=${discordMessageId}, streamStatus=${streamStatus}`)
// const guild = await bot.cache.guilds.get(BigInt(configs.discordGuildId))
// const channel = guild?.channels.get(BigInt(configs.discordChannelId))
// // const channel = await bot.cache.channels.get()
// console.log('channel as follows')
// console.log(channel)
const channelId = BigInt(configs.discordChannelId)
const updatedMessage: EditMessage = {
embeds: getStatusEmbed({ streamStatus, fileSize, streamId, url }),
embeds: getEmbeds(stream),
components: getButtonRow(stream.status)
}
bot.helpers.editMessage(channelId, discordMessageId, updatedMessage)
@ -59,7 +51,7 @@ async function editDiscordMessage({ helpers, streamStatus, discordMessageId, url
async function getStreamFromDatabase(streamId: number) {
const res = await fetch(`${process.env.POSTGREST_URL}/streams?select=*,segment:segments(*)&id=eq.${streamId}`)
const res = await fetch(`${process.env.POSTGREST_URL}/streams?select=*,segments(*)&id=eq.${streamId}`)
if (!res.ok) {
throw new Error(`failed fetching stream ${streamId}. status=${res.status}, statusText=${res.statusText}`)
}
@ -74,7 +66,6 @@ async function getStreamFromDatabase(streamId: number) {
* the most up-to-date status information from the database
*
* Sometimes the update is changing the state, one of Pending|Recording|Aborted|Ended.
* Sometimes the update is updating the Filesize of the recording in-progress
* Sometimes the update is adding a thumbnail image to the message
*/
export const update_discord_message: Task = async function (payload, helpers: Helpers) {
@ -82,113 +73,120 @@ export const update_discord_message: Task = async function (payload, helpers: He
assertPayload(payload)
const { stream_id } = payload
const streamId = stream_id
helpers.logger.info(`update_discord_message() with streamId=${streamId}`)
const stream = await getStreamFromDatabase(streamId)
const { discord_message_id, status, file_size, url } = stream
const streamStatus = status
const discordMessageId = discord_message_id
const fileSize = file_size
editDiscordMessage({ helpers, streamStatus, discordMessageId, url, fileSize, streamId })
// schedule the next update 10s from now, but only if the recording is still happening
if (streamStatus !== 'ended') {
const runAt = add(new Date(), { seconds: 10 })
const streamId = stream.id
await helpers.addJob('update_discord_message', { streamId }, { jobKey: `stream_${streamId}_update_discord_message`, maxAttempts: 3, runAt })
}
// helpers.logger.info(`update_discord_message with streamId=${streamId}. stream=${JSON.stringify(stream)}`)
editDiscordMessage({ helpers, stream })
} catch (e) {
helpers.logger.error(`caught an error during update_discord_message. e=${e}`)
}
}
function getStatusEmbed({
streamStatus, streamId, fileSize, url
}: { fileSize: number, streamStatus: Status, streamId: number, url: string }) {
function getEmbeds(stream: Stream) {
const streamId = stream.id
const url = stream.url
const segments = stream?.segments
const status = stream.status
const embeds = new EmbedsBuilder()
.setTitle(`Stream ${streamId}`)
.setFields([
{ name: 'Status', value: streamStatus.charAt(0).toUpperCase()+streamStatus.slice(1), inline: true },
{ name: 'Filesize', value: prettyBytes(fileSize), inline: true },
{ name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true },
// { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo
{ name: 'URL', value: url, inline: false },
])
if (streamStatus === 'pending') {
if (status === 'pending_recording') {
embeds
.setDescription("Waiting for a worker to accept the job.")
.setColor(2326507)
} else if (streamStatus === 'recording') {
} else if (status === 'recording') {
embeds
.setDescription('The stream is being recorded.')
.setColor(392960)
} else if (streamStatus === 'aborted') {
} else if (status === 'aborted') {
embeds
.setDescription("The recording was stopped by the user.")
.setColor(8289651)
} else if (streamStatus === 'finished') {
} else if (status === 'finished') {
embeds
.setDescription("The recording has ended nominally.")
.setColor(10855845)
} else if (streamStatus === 'failed') {
} else if (status === 'failed') {
embeds
.setDescription("The recording has ended abnorminally.")
.setColor(8289651)
} else if (streamStatus === 'stalled') {
} else if (status === 'stalled') {
embeds
.setDescription("We have not received a progress update in the past two minutes.")
.setColor(8289651)
} else {
embeds
.setDescription(`The recording is in an unknown state? (streamStatus=${streamStatus} this is a bug.)`)
.setDescription(`The recording is in an unknown state? (streamStatus=${status} this is a bug.)`)
.setColor(10855845)
}
// Add an Embed for each segment
if (segments) {
const getDuration = (s: Segment) => formatDuration(intervalToDuration({ start: s.created_at, end: s.updated_at }))
embeds.newEmbed()
.setTitle(`Recording Segments`)
.setFields(segments.map((s, i) => (
{
name: `Segment ${i+1}`,
value: `${getDuration(s)} (${prettyBytes(s.bytes)})`,
inline: false
}
)))
}
return embeds
}
function getButtonRow(streamStatus: Status): ActionRow {
function getButtonRow(streamStatus: Status): ActionRow[] {
const components: ButtonComponent[] = []
if (streamStatus === 'pending' || streamStatus === 'recording') {
const stopButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'stop',
label: 'Cancel',
style: ButtonStyles.Danger
}
components.push(stopButton)
const yeahButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'yeah',
label: "Yeah!",
emoji: {
id: yeahEmojiId
},
style: ButtonStyles.Success
}
const processButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'process',
label: 'Process Recording',
style: ButtonStyles.Success
}
const cancelButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'cancel',
label: 'Cancel',
style: ButtonStyles.Danger
}
const retryButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'record',
label: 'Retry Recording',
style: ButtonStyles.Secondary
}
if (streamStatus === 'pending_recording' || streamStatus === 'recording') {
components.push(cancelButton)
components.push(processButton) // @todo this is only for testing. normally the process button is hidden until recording completes.
components.push(yeahButton) // @todo this is only for testing. normally the process button is hidden until recording completes.
} else if (streamStatus === 'aborted') {
const retryButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'retry',
label: 'Retry Recording',
emoji: {
name: 'retry'
},
style: ButtonStyles.Secondary
}
components.push(retryButton)
} else if (streamStatus === 'finished') {
const downloadButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'download',
label: 'Download Recording',
emoji: {
id: BigInt('1253191939461873756')
},
style: ButtonStyles.Success
}
components.push(downloadButton)
components.push(processButton)
} else {
const unknownButton: ButtonComponent = {
type: MessageComponentTypes.Button,
customId: 'unknown',
label: 'Unknown Status',
emoji: {
name: 'thinking'
},
style: ButtonStyles.Primary
}
components.push(unknownButton)
components.push(retryButton)
}
@ -197,7 +195,7 @@ function getButtonRow(streamStatus: Status): ActionRow {
components: components as [ButtonComponent]
}
return actionRow
return [actionRow]
}

View File

@ -0,0 +1,123 @@
import type { Task, Helpers } from "graphile-worker"
import { sub } from 'date-fns'
import type { Status } from "@futureporn/types"
import qs from 'qs'
import fetch from 'node-fetch'
import { configs } from '../config.ts'
interface Payload {
stalled_minutes: number;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (!payload.stalled_minutes) throw new Error(`stalled_minutes was absent in the payload`);
if (typeof payload.stalled_minutes !== 'number') throw new Error(`stalled_minutes parameter was not a number`);
}
async function updateStalledStreams({
helpers,
stalled_minutes,
url
}: {
helpers: Helpers,
stalled_minutes: number,
url: string
}) {
// 1. identify and update stalled /streams
// Any streams that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled.
const timestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString()
const queryOptions = {
updated_at: `lt.${timestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)'
}
const updatePayload = {
updated_at: new Date().toISOString(),
status: 'stalled' as Status
}
// helpers.logger.info(JSON.stringify(updatePayload))
const query = qs.stringify(queryOptions)
const res = await fetch (`${url}?${query}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(updatePayload)
})
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
}
async function updateRecordingStreams({
helpers,
url
}: {
helpers: Helpers,
url: string
}) {
// identify and update recording /streams
// Any streams that has a segment that was updated within the past 1 minutes is considered recording
const timestamp = sub(new Date(), { minutes: 1 }).toISOString()
const queryOptions = {
select: 'status,id,segments!inner(updated_at)',
'segments.updated_at': `lt.${timestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)',
}
const updatePayload = {
status: 'recording'
}
// helpers.logger.info(JSON.stringify(updatePayload))
const query = qs.stringify(queryOptions)
const options = {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(updatePayload)
}
const res = await fetch (`${url}?${query}`, options)
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
}
export const update_stream_statuses: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { stalled_minutes } = payload
// helpers.logger.info(`update_stream_statuses has begun.`)
const url = 'http://postgrest.futureporn.svc.cluster.local:9000/streams'
try {
// await updateStalledStreams({ helpers, url, stalled_minutes })
await updateRecordingStreams({ helpers, url })
} catch (e: any) {
if (e instanceof Error) {
helpers.logger.error(`hi there we encountered an error while fetching /streams`)
helpers.logger.error(e.message)
} else {
helpers.logger.error(e)
}
}
}
export default update_stream_statuses

View File

@ -31,6 +31,7 @@
"@types/mocha": "^10.0.7",
"@types/qs": "^6.9.15",
"date-fns": "^3.6.0",
"discord.js": "^14.15.3",
"diskusage": "^1.2.0",
"dotenv": "^16.4.5",
"execa": "^6.1.0",

View File

@ -47,6 +47,9 @@ importers:
date-fns:
specifier: ^3.6.0
version: 3.6.0
discord.js:
specifier: ^14.15.3
version: 14.15.3
diskusage:
specifier: ^1.2.0
version: 1.2.0
@ -391,6 +394,34 @@ packages:
'@dabh/diagnostics@2.0.3':
resolution: {integrity: sha512-hrlQOIi7hAfzsMqlGSFyVucrx38O+j6wiGOf//H2ecvIEqYN4ADBSS2iLMh5UFyDunCNniUIPk/q3riFv45xRA==}
'@discordjs/builders@1.8.2':
resolution: {integrity: sha512-6wvG3QaCjtMu0xnle4SoOIeFB4y6fKMN6WZfy3BMKJdQQtPLik8KGzDwBVL/+wTtcE/ZlFjgEk74GublyEVZ7g==}
engines: {node: '>=16.11.0'}
'@discordjs/collection@1.5.3':
resolution: {integrity: sha512-SVb428OMd3WO1paV3rm6tSjM4wC+Kecaa1EUGX7vc6/fddvw/6lg90z4QtCqm21zvVe92vMMDt9+DkIvjXImQQ==}
engines: {node: '>=16.11.0'}
'@discordjs/collection@2.1.0':
resolution: {integrity: sha512-mLcTACtXUuVgutoznkh6hS3UFqYirDYAg5Dc1m8xn6OvPjetnUlf/xjtqnnc47OwWdaoCQnHmHh9KofhD6uRqw==}
engines: {node: '>=18'}
'@discordjs/formatters@0.4.0':
resolution: {integrity: sha512-fJ06TLC1NiruF35470q3Nr1bi95BdvKFAF+T5bNfZJ4bNdqZ3VZ+Ttg6SThqTxm6qumSG3choxLBHMC69WXNXQ==}
engines: {node: '>=16.11.0'}
'@discordjs/rest@2.3.0':
resolution: {integrity: sha512-C1kAJK8aSYRv3ZwMG8cvrrW4GN0g5eMdP8AuN8ODH5DyOCbHgJspze1my3xHOAgwLJdKUbWNVyAeJ9cEdduqIg==}
engines: {node: '>=16.11.0'}
'@discordjs/util@1.1.0':
resolution: {integrity: sha512-IndcI5hzlNZ7GS96RV3Xw1R2kaDuXEp7tRIy/KlhidpN/BQ1qh1NZt3377dMLTa44xDUNKT7hnXkA/oUAzD/lg==}
engines: {node: '>=16.11.0'}
'@discordjs/ws@1.1.1':
resolution: {integrity: sha512-PZ+vLpxGCRtmr2RMkqh8Zp+BenUaJqlS6xhgWKEZcgC/vfHLEzpHtKkB0sl3nZWpwtcKk6YWy+pU3okL2I97FA==}
engines: {node: '>=16.11.0'}
'@esbuild/aix-ppc64@0.21.5':
resolution: {integrity: sha512-1SDgH6ZSPTlggy1yI6+Dbkiz8xzpHJEVAlF/AM1tHPLsf5STom9rwtjE4hKAF20FfXXNTFqEYXyJNWh1GiZedQ==}
engines: {node: '>=12'}
@ -819,6 +850,18 @@ packages:
cpu: [x64]
os: [win32]
'@sapphire/async-queue@1.5.3':
resolution: {integrity: sha512-x7zadcfJGxFka1Q3f8gCts1F0xMwCKbZweM85xECGI0hBTeIZJGGCrHgLggihBoprlQ/hBmDR5LKfIPqnmHM3w==}
engines: {node: '>=v14.0.0', npm: '>=7.0.0'}
'@sapphire/shapeshift@3.9.7':
resolution: {integrity: sha512-4It2mxPSr4OGn4HSQWGmhFMsNFGfFVhWeRPCRwbH972Ek2pzfGRZtb0pJ4Ze6oIzcyh2jw7nUDa6qGlWofgd9g==}
engines: {node: '>=v16'}
'@sapphire/snowflake@3.5.3':
resolution: {integrity: sha512-jjmJywLAFoWeBi1W7994zZyiNWPIiqRRNAmSERxyg93xRGzNYvGjlZ0gR6x0F4gPRi2+0O6S71kOZYyr3cxaIQ==}
engines: {node: '>=v14.0.0', npm: '>=7.0.0'}
'@sinonjs/commons@2.0.0':
resolution: {integrity: sha512-uLa0j859mMrg2slwQYdO/AkrOfmH+X6LTVmNTS9CqexuE2IvVORIkSpJLqePAbEnKJ77aMmCwr1NUZ57120Xcg==}
@ -1106,6 +1149,13 @@ packages:
'@types/triple-beam@1.3.5':
resolution: {integrity: sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==}
'@types/ws@8.5.12':
resolution: {integrity: sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==}
'@vladfrangu/async_event_emitter@2.4.5':
resolution: {integrity: sha512-J7T3gUr3Wz0l7Ni1f9upgBZ7+J22/Q1B7dl0X6fG+fTsD+H+31DIosMHj4Um1dWQwqbcQ3oQf+YS2foYkDc9cQ==}
engines: {node: '>=v14.0.0', npm: '>=7.0.0'}
abort-controller@3.0.0:
resolution: {integrity: sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==}
engines: {node: '>=6.5'}
@ -1484,6 +1534,13 @@ packages:
resolution: {integrity: sha512-WkrWp9GR4KXfKGYzOLmTuGVi1UWFfws377n9cc55/tb6DuqyF6pcQ5AbiHEshaDpY9v6oaSr2XCDidGmMwdzIA==}
engines: {node: '>=8'}
discord-api-types@0.37.83:
resolution: {integrity: sha512-urGGYeWtWNYMKnYlZnOnDHm8fVRffQs3U0SpE8RHeiuLKb/u92APS8HoQnPTFbnXmY1vVnXjXO4dOxcAn3J+DA==}
discord.js@14.15.3:
resolution: {integrity: sha512-/UJDQO10VuU6wQPglA4kz2bw2ngeeSbogiIPx/TsnctfzV/tNf+q+i1HlgtX1OGpeOBpJH9erZQNO5oRM2uAtQ==}
engines: {node: '>=16.11.0'}
diskusage@1.2.0:
resolution: {integrity: sha512-2u3OG3xuf5MFyzc4MctNRUKjjwK+UkovRYdD2ed/NZNZPrt0lqHnLKxGhlFVvAb4/oufIgQG3nWgwmeTbHOvXA==}
@ -2051,12 +2108,18 @@ packages:
lodash.isarguments@3.1.0:
resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==}
lodash.snakecase@4.1.1:
resolution: {integrity: sha512-QZ1d4xoBHYUeuouhEq3lk3Uq7ldgyFXGBhg04+oRLnIz8o9T65Eh+8YdroUwn846zchkA9yDsDl5CVVaV2nqYw==}
lodash.sortby@4.7.0:
resolution: {integrity: sha512-HDWXG8isMntAyRF5vZ7xKuEvOhT4AhlRt/3czTSjvGUxjYCBVRQY48ViDHyfYz9VIoBkW4TMGQNapx+l3RUwdA==}
lodash@4.1.0:
resolution: {integrity: sha512-B9sgtKUlz0xe7lkYb80BcOpwwJJw5iOiz4HkBDzF0+i5nJLiwfBnL08m7bBkCOPBfi+0aqvrJDMdZDfAvs8vYg==}
lodash@4.17.21:
resolution: {integrity: sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==}
log-symbols@4.1.0:
resolution: {integrity: sha512-8XPvpAA8uyhfteu8pIvQxpJZ7SYYdpUivZpGy6sFsBuKRY/7rQGavedeB8aK+Zkyq6upMFVL/9AW6vOYzfRyLg==}
engines: {node: '>=10'}
@ -2075,6 +2138,9 @@ packages:
resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==}
engines: {node: '>=12'}
magic-bytes.js@1.10.0:
resolution: {integrity: sha512-/k20Lg2q8LE5xiaaSkMXk4sfvI+9EGEykFS4b0CHHGWqDYU0bGUFSwchNOMA56D7TCs9GwVTkqe9als1/ns8UQ==}
make-error@1.3.6:
resolution: {integrity: sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==}
@ -2777,6 +2843,9 @@ packages:
ts-interface-checker@0.1.13:
resolution: {integrity: sha512-Y/arvbn+rrz3JCKl9C4kVNfTfSm2/mEp5FSz5EsZSANGPSlQrpRI5M4PKF+mJnE52jOO90PnPSc3Ur3bTQw0gA==}
ts-mixer@6.0.4:
resolution: {integrity: sha512-ufKpbmrugz5Aou4wcr5Wc1UUFWOLhq+Fm6qa6P0w0K5Qw2yhaUoiWszhCVuNQyNwrlGiscHOmqYoAox1PtvgjA==}
ts-node@10.9.2:
resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==}
hasBin: true
@ -2791,6 +2860,9 @@ packages:
'@swc/wasm':
optional: true
tslib@2.6.2:
resolution: {integrity: sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==}
tslib@2.6.3:
resolution: {integrity: sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ==}
@ -2867,6 +2939,10 @@ packages:
undici-types@5.26.5:
resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==}
undici@6.13.0:
resolution: {integrity: sha512-Q2rtqmZWrbP8nePMq7mOJIN98M0fYvSgV89vwl/BQRT4mDOeY2GXZngfGpcBBhtky3woM7G24wZV3Q304Bv6cw==}
engines: {node: '>=18.0'}
universalify@0.2.0:
resolution: {integrity: sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==}
engines: {node: '>= 4.0.0'}
@ -2950,6 +3026,18 @@ packages:
wrappy@1.0.2:
resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==}
ws@8.18.0:
resolution: {integrity: sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==}
engines: {node: '>=10.0.0'}
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: '>=5.0.2'
peerDependenciesMeta:
bufferutil:
optional: true
utf-8-validate:
optional: true
xml2js@0.6.2:
resolution: {integrity: sha512-T4rieHaC1EXcES0Kxxj4JWgaUQHDk+qwHcYOCFHfiwKz7tOVPLq7Hjq9dM1WCMhylqMEfP7hMcOIChvotiZegA==}
engines: {node: '>=4.0.0'}
@ -3533,6 +3621,53 @@ snapshots:
enabled: 2.0.0
kuler: 2.0.0
'@discordjs/builders@1.8.2':
dependencies:
'@discordjs/formatters': 0.4.0
'@discordjs/util': 1.1.0
'@sapphire/shapeshift': 3.9.7
discord-api-types: 0.37.83
fast-deep-equal: 3.1.3
ts-mixer: 6.0.4
tslib: 2.6.3
'@discordjs/collection@1.5.3': {}
'@discordjs/collection@2.1.0': {}
'@discordjs/formatters@0.4.0':
dependencies:
discord-api-types: 0.37.83
'@discordjs/rest@2.3.0':
dependencies:
'@discordjs/collection': 2.1.0
'@discordjs/util': 1.1.0
'@sapphire/async-queue': 1.5.3
'@sapphire/snowflake': 3.5.3
'@vladfrangu/async_event_emitter': 2.4.5
discord-api-types: 0.37.83
magic-bytes.js: 1.10.0
tslib: 2.6.3
undici: 6.13.0
'@discordjs/util@1.1.0': {}
'@discordjs/ws@1.1.1':
dependencies:
'@discordjs/collection': 2.1.0
'@discordjs/rest': 2.3.0
'@discordjs/util': 1.1.0
'@sapphire/async-queue': 1.5.3
'@types/ws': 8.5.12
'@vladfrangu/async_event_emitter': 2.4.5
discord-api-types: 0.37.83
tslib: 2.6.3
ws: 8.18.0
transitivePeerDependencies:
- bufferutil
- utf-8-validate
'@esbuild/aix-ppc64@0.21.5':
optional: true
@ -3794,6 +3929,15 @@ snapshots:
'@rollup/rollup-win32-x64-msvc@4.19.1':
optional: true
'@sapphire/async-queue@1.5.3': {}
'@sapphire/shapeshift@3.9.7':
dependencies:
fast-deep-equal: 3.1.3
lodash: 4.17.21
'@sapphire/snowflake@3.5.3': {}
'@sinonjs/commons@2.0.0':
dependencies:
type-detect: 4.0.8
@ -4212,6 +4356,12 @@ snapshots:
'@types/triple-beam@1.3.5': {}
'@types/ws@8.5.12':
dependencies:
'@types/node': 20.14.13
'@vladfrangu/async_event_emitter@2.4.5': {}
abort-controller@3.0.0:
dependencies:
event-target-shim: 5.0.1
@ -4610,6 +4760,26 @@ snapshots:
dependencies:
path-type: 4.0.0
discord-api-types@0.37.83: {}
discord.js@14.15.3:
dependencies:
'@discordjs/builders': 1.8.2
'@discordjs/collection': 1.5.3
'@discordjs/formatters': 0.4.0
'@discordjs/rest': 2.3.0
'@discordjs/util': 1.1.0
'@discordjs/ws': 1.1.1
'@sapphire/snowflake': 3.5.3
discord-api-types: 0.37.83
fast-deep-equal: 3.1.3
lodash.snakecase: 4.1.1
tslib: 2.6.2
undici: 6.13.0
transitivePeerDependencies:
- bufferutil
- utf-8-validate
diskusage@1.2.0:
dependencies:
es6-promise: 4.2.8
@ -5289,10 +5459,14 @@ snapshots:
lodash.isarguments@3.1.0: {}
lodash.snakecase@4.1.1: {}
lodash.sortby@4.7.0: {}
lodash@4.1.0: {}
lodash@4.17.21: {}
log-symbols@4.1.0:
dependencies:
chalk: 4.1.2
@ -5315,6 +5489,8 @@ snapshots:
luxon@3.4.4: {}
magic-bytes.js@1.10.0: {}
make-error@1.3.6: {}
merge-stream@2.0.0: {}
@ -6051,6 +6227,8 @@ snapshots:
ts-interface-checker@0.1.13: {}
ts-mixer@6.0.4: {}
ts-node@10.9.2(@types/node@20.14.13)(typescript@5.5.4):
dependencies:
'@cspotcode/source-map-support': 0.8.1
@ -6069,6 +6247,8 @@ snapshots:
v8-compile-cache-lib: 3.0.1
yn: 3.1.1
tslib@2.6.2: {}
tslib@2.6.3: {}
tsup@8.2.3(tsx@4.16.2)(typescript@5.5.4):
@ -6168,6 +6348,8 @@ snapshots:
undici-types@5.26.5: {}
undici@6.13.0: {}
universalify@0.2.0: {}
url-parse@1.5.10:
@ -6274,6 +6456,8 @@ snapshots:
wrappy@1.0.2: {}
ws@8.18.0: {}
xml2js@0.6.2:
dependencies:
sax: 1.2.1

View File

@ -151,14 +151,15 @@ export default class Record {
console.log('parallelUploads3 is complete.')
} catch (e) {
// if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error.
// so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted.
// in other words, `(e.name === 'AbortError')` will never be true.
if (this.abortSignal.aborted) return;
if (e instanceof Error) {
if (e.name === 'AbortError') {
console.error(`We got an error, AbortError which is something we know how to handle. we will NOT throw and instead return gracefully.`)
return
} else {
console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`)
throw e
}
console.error(`We were uploading a file to S3 but then we encountered an exception!`)
console.error(e)
throw e
} else {
throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`)
}

View File

@ -27,7 +27,9 @@ const preset: GraphileConfig.Preset = {
};
async function api() {
async function doRunApi() {
if (!process.env.PORT) throw new Error('PORT is missing in env');
console.log(`api FUNCTION listening on PORT ${process.env.PORT}`)
const PORT = parseInt(process.env.PORT!)
@ -51,18 +53,42 @@ async function api() {
})
}
async function worker(workerUtils: WorkerUtils) {
async function doRunWorker(workerUtils: WorkerUtils) {
let workerIds: string[] = []
const runnerOptions: RunnerOptions = {
preset,
concurrency,
taskDirectory: join(__dirname, 'tasks'),
// taskList: {
// 'record': record,
// }
}
const runner = await graphileRun(runnerOptions)
if (!runner) throw new Error('failed to initialize graphile worker');
/**
* This is likely only relevant during development.
* if nodemon restarts us, we need to unlock the graphile-worker job so it gets retried immediately by another worker.
*
*/
runner.events.on('worker:create', ({ worker }) => {
// There is no way to get workerIds on demand when the SIGUSR2 comes in, so we log the IDs ahead of time.
workerIds.push(worker.workerId)
})
process.on('SIGUSR2', async () => {
console.warn(`SIGUSR2 detected! ulocking ${workerIds.length} workers, workerIds=${workerIds}`)
await workerUtils.forceUnlockWorkers(workerIds)
process.kill(process.pid, 'SIGTERM');
})
runner.events.on("pool:gracefulShutdown", async ({ workerPool, message }) => {
const workerIds = workerPool._workers.map((w) => w.workerId)
console.warn(`gracefulShutdown detected. releasing job locks on ${workerIds.length} workers, workerIds=${workerIds}, message=${message}`);
await workerUtils.forceUnlockWorkers(workerIds)
});
await runner.promise
}
@ -74,9 +100,9 @@ async function main() {
console.log(`@futureporn/capture version ${version} (FUNCTION=${process.env.FUNCTION})`)
if (process.env.FUNCTION === 'api') {
api()
doRunApi()
} else if (process.env.FUNCTION === 'worker') {
worker(workerUtils)
doRunWorker(workerUtils)
} else {
throw new Error('process.env.FUNCTION must be either api or worker. got '+process.env.FUNCTION)
}
@ -85,5 +111,5 @@ async function main() {
main().catch((err) => {
console.error('there was an error!')
console.error(err);
process.exit(1);
process.exit(874);
});

View File

@ -121,6 +121,8 @@ interface Payload {
stream_id: string;
}
function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (typeof payload.url !== "string") throw new Error("invalid url");
@ -141,14 +143,8 @@ async function getRecordInstance(url: string, segment_id: number, helpers: Helpe
const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const onProgress = (fileSize: number) => {
updateDatabaseRecord({ segment_id, fileSize, helpers })
.then((reee) => {
helpers.logger.info(JSON.stringify(reee))
return reee
})
.then(checkIfAborted)
.then((isAborted) => {
helpers.logger.info(`isAborted=${isAborted}`)
isAborted ? abortController.abort() : null
})
.catch((e) => {
@ -316,9 +312,11 @@ const doRecordSegment = async function doRecordSegment(url: string, stream_id: s
export const record: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload)
const { url, stream_id } = payload
const recordId = stream_id
const streamId = stream_id
try {
/**
* We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay.
@ -329,16 +327,31 @@ export const record: Task = async function (payload: unknown, helpers: Helpers)
* @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case.
*/
// await backOff(() => doRecordSegment(url, recordId, helpers))
await doRecordSegment(url, recordId, helpers)
await doRecordSegment(url, streamId, helpers)
} catch (e) {
// await updateDatabaseRecord({ recordId: stream_id, recordingState: 'failed' })
helpers.logger.error(`caught an error during record Task`)
if (e instanceof Error) {
helpers.logger.error(e.message)
helpers.logger.info(`error.name=${e.name}`)
if (e.name === 'RoomOfflineError') {
// If room is offline, we want to retry until graphile-worker retries expire.
// We don't want to swallow the error so we simply log the error then let the below throw re-throw the error
// graphile-worker will retry when we re-throw the error below.
helpers.logger.info(`Room is offline.`)
} else if (e.name === 'AbortError') {
// If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job.
// We swallow the error and return in order to mark the job as succeeded.
helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`)
return
} else {
helpers.logger.error(e.message)
}
} else {
helpers.logger.error(JSON.stringify(e))
}
// throw e // @todo uncomment this for production
// we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job.
helpers.logger.error(`we got an error during record Task so we throw and retry`)
throw e
}
}

View File

@ -0,0 +1,14 @@
-- instead of using record_id, we need to use stream_id
DROP FUNCTION public.tg__update_discord_message CASCADE;
CREATE FUNCTION public.tg__update_discord_message() RETURNS trigger
LANGUAGE plpgsql SECURITY DEFINER
SET search_path TO 'pg_catalog', 'public', 'pg_temp'
AS $$
begin
PERFORM graphile_worker.add_job('update_discord_message', json_build_object(
'stream_id', NEW.id
), max_attempts := 3);
return NEW;
end;
$$;

View File

@ -0,0 +1,5 @@
-- when a stream is updated, we add a job in graphile to update_discord_message
CREATE TRIGGER stream_update
AFTER UPDATE ON api.streams
FOR EACH ROW
EXECUTE PROCEDURE public.tg__update_discord_message('update_discord_message');

View File

@ -0,0 +1,23 @@
-- in order for discord chatops messages to be updated when a segment is updated,
-- we need to have postgres update the related stream timestamp when a segment is updated.
CREATE OR REPLACE FUNCTION update_stream_on_segment_update()
RETURNS TRIGGER AS $$
BEGIN
UPDATE api.streams
SET updated_at = NOW()
WHERE id IN (
SELECT stream_id
FROM segments_stream_links
WHERE segment_id = NEW.id
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_update_stream
AFTER UPDATE ON api.segments
FOR EACH ROW
EXECUTE FUNCTION update_stream_on_segment_update();