diff --git a/Tiltfile b/Tiltfile index aad992d..252aaba 100644 --- a/Tiltfile +++ b/Tiltfile @@ -432,6 +432,13 @@ k8s_resource( link('https://game-2048.fp.sbtp.xyz/') ] ) +k8s_resource( + workload='whoami', + labels=['frontend'], + links=[ + link('https://whoami.fp.sbtp.xyz/') + ] +) k8s_resource( workload='postgresql-primary', port_forwards=['5432'], diff --git a/charts/fp/templates/capture.yaml b/charts/fp/templates/capture.yaml index 9be7df4..85a66e7 100644 --- a/charts/fp/templates/capture.yaml +++ b/charts/fp/templates/capture.yaml @@ -52,6 +52,11 @@ spec: secretKeyRef: name: postgrest key: automationUserJwt + - name: HTTP_PROXY + valueFrom: + secretKeyRef: + name: capture + key: httpProxy - name: POSTGREST_URL value: "{{ .Values.postgrest.url }}" - name: PORT diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 5f62fdd..7de9d28 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -6,13 +6,14 @@ declare namespace Futureporn { type PlatformNotificationType = 'email' | 'manual' | 'twitter' type ArchiveStatus = 'good' | 'issue' | 'missing' - type RecordingState = 'pending' | 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished' - type Status = Partial + type RecordingState = 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished' + type Status = Partial<'pending_recording' | RecordingState> interface Stream { id: string; url: string; platform_notification_type: PlatformNotificationType; + discord_message_id: string; date: Date; created_at: Date; updated_at: Date; @@ -23,6 +24,7 @@ declare namespace Futureporn { is_fansly_stream: Boolean; is_recording_aborted: Boolean; status: Status; + segments?: Segment[] } interface RecordingRecord { @@ -41,6 +43,8 @@ declare namespace Futureporn { s3_id: string; bytes: number; stream?: Stream[]; + created_at: Date; + updated_at: Date; } diff --git a/scripts/k8s-secrets.sh b/scripts/k8s-secrets.sh index 8776f7a..65c5a11 100755 --- a/scripts/k8s-secrets.sh +++ b/scripts/k8s-secrets.sh @@ -71,7 +71,8 @@ kubectl --namespace futureporn delete secret capture --ignore-not-found kubectl --namespace futureporn create secret generic capture \ --from-literal=workerConnectionString=${WORKER_CONNECTION_STRING} \ --from-literal=s3AccessKeyId=${S3_USC_BUCKET_KEY_ID} \ ---from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY} +--from-literal=s3SecretAccessKey=${S3_USC_BUCKET_APPLICATION_KEY} \ +--from-literal=httpProxy=${HTTP_PROXY} kubectl --namespace futureporn delete secret mailbox --ignore-not-found kubectl --namespace futureporn create secret generic mailbox \ diff --git a/services/bot/crontab b/services/bot/crontab index b8a176f..b656b85 100644 --- a/services/bot/crontab +++ b/services/bot/crontab @@ -15,4 +15,4 @@ ## every n minutes, we see which /records are stale and we mark them as such. ## this prevents stalled Record updates by marking stalled recordings as stopped -* * * * * expire_stream_recordings ?max=1 { idle_minutes:2 } \ No newline at end of file +* * * * * update_stream_statuses ?max=1 { stalled_minutes:1 } \ No newline at end of file diff --git a/services/bot/src/bot.ts b/services/bot/src/bot.ts index 8190158..983bdd6 100644 --- a/services/bot/src/bot.ts +++ b/services/bot/src/bot.ts @@ -36,6 +36,7 @@ bot.transformers.desiredProperties.interaction.token = true bot.transformers.desiredProperties.interaction.guildId = true bot.transformers.desiredProperties.interaction.member = true bot.transformers.desiredProperties.interaction.message = true +bot.transformers.desiredProperties.interaction.user = true bot.transformers.desiredProperties.message.activity = true bot.transformers.desiredProperties.message.id = true diff --git a/services/bot/src/collector.ts b/services/bot/src/collector.ts new file mode 100644 index 0000000..fd25474 --- /dev/null +++ b/services/bot/src/collector.ts @@ -0,0 +1,14 @@ +import { EventEmitter } from 'node:events' +import type { Interaction } from '@discordeno/bot' + +export class ItemCollector extends EventEmitter { + onItem(callback: (item: Interaction) => unknown): void { + this.on('item', callback) + } + + collect(item: Interaction): void { + this.emit('item', item) + } +} + +export default ItemCollector \ No newline at end of file diff --git a/services/bot/src/commands/cancel.ts b/services/bot/src/commands/cancel.ts new file mode 100644 index 0000000..776f060 --- /dev/null +++ b/services/bot/src/commands/cancel.ts @@ -0,0 +1,51 @@ + +import { ApplicationCommandTypes, type Interaction } from '@discordeno/bot' +import type { Status } from '@futureporn/types' +import { createCommand } from '../commands.ts' +import { bot } from '../bot.ts' +import { configs } from '../config.ts' + +createCommand({ + name: 'cancel', + description: 'Cancel a recording', + type: ApplicationCommandTypes.ChatInput, + async execute(interaction: Interaction) { + bot.logger.info(`cancel command is executing now.`) + const message = interaction.message + if (!message) return bot.logger.error('interaction.message was missing'); + if (!message.id) return bot.logger.error(`interaction.message.id was missing`); + + const url = `${configs.postgrestUrl}/streams?discord_message_id=eq.${message.id}`; + const options = { + method: 'PATCH', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/json', + 'Prefer': 'return=representation', + 'Authorization': `Bearer ${configs.automationUserJwt}` + }, + body: JSON.stringify({ + is_recording_aborted: true, + status: 'aborted' as Status + }) + }; + + let streamId: string; + try { + const response = await fetch(url, options); + + bot.logger.info(`response.ok=${response.ok}`) + const data: any = await response.json(); + streamId = data?.at(0).id + bot.logger.info(interaction.user); + interaction.respond(`<@${interaction.user.id}> cancelled recording on Stream ${streamId}`, { isPrivate: false }) + bot.logger.info(`Cancel command successfully ran on message.id=${message.id}`) + + } catch (error) { + bot.logger.error('error encountered while cancelling job') + bot.logger.error(error); + } + + + }, +}) diff --git a/services/bot/src/commands/record.ts b/services/bot/src/commands/record.ts index fb8694c..8ad1e53 100644 --- a/services/bot/src/commands/record.ts +++ b/services/bot/src/commands/record.ts @@ -4,9 +4,11 @@ import { type Interaction, EmbedsBuilder, type InteractionCallbackData, + logger, } from '@discordeno/bot' import { createCommand } from '../commands.ts' import { configs } from '../config.ts' +import type { Stream } from '@futureporn/types' async function createStreamInDatabase(url: string, discordMessageId: string) { @@ -19,15 +21,16 @@ async function createStreamInDatabase(url: string, discordMessageId: string) { method: 'POST', headers: { 'Content-Type': 'application/json', + 'Prefer': 'return=headers-only', 'Authorization': `Bearer ${configs.automationUserJwt}`, - 'Prefer': 'return=headers-only' }, body: JSON.stringify(streamPayload) }) if (!res.ok) { const status = res.status const statusText = res.statusText - const msg = `fetch failed to create stream in database. status=${status}, statusText=${statusText}` + const body = await res.text() + const msg = `failed to create stream in database. status=${status}, statusText=${statusText}, body=${body}` console.error(msg) throw new Error(msg) } @@ -36,6 +39,47 @@ async function createStreamInDatabase(url: string, discordMessageId: string) { return parseInt(id) } +async function getUrlFromMessage(interaction: Interaction): Promise { + const messageId = interaction.message?.id + + const pgRequestUrl = `${configs.postgrestUrl}/streams?discord_message_id=eq.${messageId}` + logger.info(`pgRequestUrl=${pgRequestUrl}`) + const requestOptions = { + method: 'GET', + headers: { + 'Authorization': `Bearer ${configs.automationUserJwt}`, + 'Content-Type': 'application/json', + 'Prefer': 'return=representation' + } + } + try { + const res = await fetch (pgRequestUrl, requestOptions) + if (!res.ok) { + const body = await res.json() + logger.error(body) + throw new Error(`Problem during getOptionsMessage. res.status=${res.status}, res.statusText=${res.statusText}`) + } + const json = await res.json() as Stream[] + const stream = json[0] + const url = stream?.url + if (!url) return null + else return url + } catch (e) { + logger.error(e) + throw e + } + + +} + +async function getUrlFromData(interaction: Interaction): Promise { + if (!interaction) throw new Error('interaction arg passed to getOptions was missing'); + const url = (interaction.data?.options?.find(o => o.name === 'url'))?.value + if (!url) return null; + return String(url) +} + + createCommand({ name: 'record', description: 'Record a livestream.', @@ -48,43 +92,50 @@ createCommand({ }, ], async execute(interaction: Interaction) { + logger.info('logger.info hello? record command is running now`)') await interaction.defer() - // console.log('interation.data as follows') - // console.log(interaction.data) - const options = interaction.data?.options - if (!options) throw new Error(`interaction options was undefined. it's expected to be an array of options.`); - const urlOption = options.find((o) => o.name === 'url') - if (!urlOption) throw new Error(`url option was missing from interaction data`); - const url = ''+urlOption.value - if (!url) throw new Error(`url was missing from interaction data options`); + try { + // The url can come from one of two places. + // interaction.data.options, or interaction.message?.embeds + let url + url = await getUrlFromData(interaction) + logger.info(`getUrlFromData url=${url}`) + if (!url) { + url = await getUrlFromMessage(interaction) + logger.info(`getUrlFromMessage url=${url}`) + } + + logger.info(`url=${url}`) + if (!url) throw new Error('Neither the interaction data nor the message embed contained a URL.'); + - // respond to the interaction and get a message ID which we will then add to the database Record - const embeds = new EmbedsBuilder() - .setTitle(`Record ⋅`) - .setDescription('Waiting for a worker to start the job.') - .setFields([ - { name: 'Status', value: 'Pending', inline: true }, - { name: 'Filesize', value: '0 bytes', inline: true}, - { name: 'URL', value: url, inline: false } - ]) - .setColor('#808080') - - const response: InteractionCallbackData = { embeds } - const message = await interaction.edit(response) + // respond to the interaction and get a message ID which we will then add to the database Record + const embeds = new EmbedsBuilder() + .setTitle(`Stream ⋅`) + .setDescription('Waiting for a worker to start the job.') + .setFields([ + { name: 'Status', value: 'Pending', inline: true }, + { name: 'URL', value: url, inline: true } + ]) + .setColor('#808080') + + const response: InteractionCallbackData = { embeds } + const message = await interaction.edit(response) - // console.log('deferred, interaction message is as follows') - // console.log(message) - if (!message?.id) { - const msg = `message.id was empty, ruh roh raggy` - console.error(msg) - throw new Error(msg) + if (!message?.id) { + const msg = `message.id was empty, ruh roh raggy` + console.error(msg) + throw new Error(msg) + } + + // @todo create stream in db + const stream = await createStreamInDatabase(url, message.id.toString()) + logger.info(stream) + } catch (e) { + await interaction.edit(`Record failed due to the following error.\n${e}`) } - // @todo create record in db - const record = await createStreamInDatabase(url, message.id.toString()) - // console.log(record) - } }) \ No newline at end of file diff --git a/services/bot/src/commands/yeah.ts b/services/bot/src/commands/yeah.ts new file mode 100644 index 0000000..5555aee --- /dev/null +++ b/services/bot/src/commands/yeah.ts @@ -0,0 +1,20 @@ + +import { ApplicationCommandTypes, type Interaction } from '@discordeno/bot' +import { createCommand } from '../commands.ts' +import { bot } from '../bot.ts' + + + +createCommand({ + name: 'yeah', + description: 'Yeah! a message', + type: ApplicationCommandTypes.ChatInput, + async execute(interaction: Interaction) { + // interaction.message.id + const message = interaction.message + if (!message) return bot.logger.error('interaction.message was missing'); + if (!message.id) return bot.logger.error(`interaction.message.id was missing`); + interaction.respond('https://futureporn-b2.b-cdn.net/yeah_nobg.png', { isPrivate: true }) + bot.logger.info(`Yeah! command successfully ran with message.id=${message.id}`) + }, +}) diff --git a/services/bot/src/components/README.md b/services/bot/src/components/README.md new file mode 100644 index 0000000..3dce41a --- /dev/null +++ b/services/bot/src/components/README.md @@ -0,0 +1 @@ +Handlers for Message Component interactions such as button presses \ No newline at end of file diff --git a/services/bot/src/events/interactionCreate.ts b/services/bot/src/events/interactionCreate.ts index 920e61a..a9227c4 100644 --- a/services/bot/src/events/interactionCreate.ts +++ b/services/bot/src/events/interactionCreate.ts @@ -1,22 +1,50 @@ import { InteractionTypes, commandOptionsParser, type Interaction } from '@discordeno/bot' import { bot } from '../bot.ts' -import { commands } from '../commands.ts' +import { commands, type Command } from '../commands.ts' +import ItemCollector from '../collector.ts' -bot.events.interactionCreate = async (interaction: Interaction) => { - if (!interaction.data || interaction.type !== InteractionTypes.ApplicationCommand) return - - const command = commands.get(interaction.data.name) - - if (!command) { - bot.logger.error(`Command ${interaction.data.name} not found`) - return - } +export const collectors = new Set() +const execCommand = async function execCommand(command: Command, interaction: Interaction) { const options = commandOptionsParser(interaction) - + try { await command.execute(interaction, options) } catch (error) { bot.logger.error(`There was an error running the ${command.name} command.`, error) } +} + +const handleApplicationCommand = async function handleApplicationCommand (interaction: Interaction) { + + if (!interaction.data) return + const command = commands.get(interaction.data.name) + + if (!command) { + bot.logger.error(`Command ${interaction.data.name} (customId=${interaction.data.customId}) not found`) + return + } + + execCommand(command, interaction) + +} + +const handleMessageComponent = async function handleMessageComponent (interaction: Interaction) { + if (!interaction.data) return + if (!interaction.data.customId) return + const command = commands.get(interaction.data.customId) + if (!command) return bot.logger.error(`Command ${interaction.data.customId} not found`); + execCommand(command, interaction) +} + +bot.events.interactionCreate = async (interaction: Interaction) => { + + if (interaction.type === InteractionTypes.ApplicationCommand) { + await handleApplicationCommand(interaction) + } else if (interaction.type === InteractionTypes.MessageComponent) { + await handleMessageComponent(interaction) + } else { + bot.logger.info(`received interaction of type=${interaction.type}`) + } + } \ No newline at end of file diff --git a/services/bot/src/old/clientReady.ts b/services/bot/src/old/clientReady.ts deleted file mode 100644 index d15f078..0000000 --- a/services/bot/src/old/clientReady.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { Client, Events, type Interaction } from 'discord.js'; - -export default { - name: Events.ClientReady, - once: true, - execute(client: Client) { - console.log(`Ready! Logged in as ${client?.user?.tag}`); - } -} \ No newline at end of file diff --git a/services/bot/src/old/commands-index.ts b/services/bot/src/old/commands-index.ts deleted file mode 100644 index 8855c9d..0000000 --- a/services/bot/src/old/commands-index.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { type CreateApplicationCommand, type CreateSlashApplicationCommand, type Interaction } from '@discordeno/bot' -import record from '../commands/record.ts' -import donger from '../commands/donger.ts' - -export const commands = new Map( - [ - record, - donger - ].map(cmd => [cmd.name, cmd]), -) - - -export default commands - -export interface Command extends CreateSlashApplicationCommand { - /** Handler that will be executed when this command is triggered */ - execute(interaction: Interaction, args: Record): Promise -} \ No newline at end of file diff --git a/services/bot/src/old/deployCommands.ts b/services/bot/src/old/deployCommands.ts deleted file mode 100644 index 9753f38..0000000 --- a/services/bot/src/old/deployCommands.ts +++ /dev/null @@ -1,25 +0,0 @@ -import 'dotenv/config'; -import { REST, Routes } from 'discord.js'; - -if (!process.env.DISCORD_APPLICATION_ID) throw new Error('DISCORD_APPLICATION_ID was undefined in env'); -if (!process.env.DISCORD_GUILD_ID) throw new Error('DISCORD_GUILD_ID was undefined in env'); -if (!process.env.DISCORD_TOKEN) throw new Error('DISCORD_TOKEN was undefined in env'); - -// Construct and prepare an instance of the REST module -const rest = new REST({ version: '9' }).setToken(process.env.DISCORD_TOKEN); - -export default async function deployCommands(commands: any[]): Promise { - try { - // console.log(`Started refreshing ${commands.length} application (/) commands.`); - // and deploy your commands! - const data: any = await rest.put( - Routes.applicationGuildCommands(process.env.DISCORD_APPLICATION_ID!, process.env.DISCORD_GUILD_ID!), - { body: commands }, - ); - - // console.log(`Successfully reloaded ${data.length} application (/) commands.`); - } catch (error) { - // And of course, make sure you catch and log any errors! - console.error(error); - } -} \ No newline at end of file diff --git a/services/bot/src/old/index.ts b/services/bot/src/old/index.ts deleted file mode 100644 index 9bc1866..0000000 --- a/services/bot/src/old/index.ts +++ /dev/null @@ -1,8 +0,0 @@ -import type { EventHandlers } from '@discordeno/bot' -import { event as interactionCreateEvent } from './interactionCreate.ts.old' - -export const events = { - interactionCreate: interactionCreateEvent, -} as Partial - -export default events \ No newline at end of file diff --git a/services/bot/src/old/interactionCreate.ts.old b/services/bot/src/old/interactionCreate.ts.old deleted file mode 100644 index b9ea694..0000000 --- a/services/bot/src/old/interactionCreate.ts.old +++ /dev/null @@ -1,71 +0,0 @@ -import { Events, type Interaction, Client, Collection } from 'discord.js'; -import type { WorkerUtils } from 'graphile-worker'; - -interface ExtendedClient extends Client { - commands: Collection -} - -export default { - name: Events.InteractionCreate, - once: false, - async execute(interaction: Interaction, workerUtils: WorkerUtils) { - // if (!interaction.isChatInputCommand()) return; - // console.log(interaction.client) - // const command = interaction.client.commands.get(interaction.commandName); - if (interaction.isButton()) { - console.log(`the interaction is a button type with customId=${interaction.customId}, message.id=${interaction.message.id}, user=${interaction.user.id} (${interaction.user.globalName})`) - if (interaction.customId === 'stop') { - interaction.reply(`Stopped by @${interaction.user.id}`) - workerUtils.addJob('stop_recording', { discordMessageId: interaction.message.id, userId: interaction.user.id }, { maxAttempts: 1 }) - } else if (interaction.customId === 'retry') { - interaction.reply(`Retried by @${interaction.user.id}`) - workerUtils.addJob('start_recording', { discordMessageId: interaction.message.id, userId: interaction.user.id }, { maxAttempts: 3 }) - } else { - console.error(`this button's customId=${interaction.customId} did not match one of the known customIds`) - } - - } else if (interaction.isChatInputCommand()) { - console.log(`the interaction is a ChatInputCommandInteraction with commandName=${interaction.commandName}, user=${interaction.user.id} (${interaction.user.globalName})`) - const client = interaction.client as ExtendedClient - const command = client.commands.get(interaction.commandName); - - if (!command) { - console.error(`No command matching ${interaction.commandName} was found.`); - return; - } - - command.execute({ interaction, workerUtils }) - } - - }, -}; - -// const { Events } = require('discord.js'); - -// module.exports = { -// name: Events.ClientReady, -// once: true, -// execute(client) { -// console.log(`Ready! Logged in as ${client.user.tag}`); -// }, -// }; - - - -// client.on(Events.InteractionCreate, interaction => { -// if (interaction.isChatInputCommand()) { -// const { commandName } = interaction; -// console.log(`Received interaction with commandName=${commandName}`) -// const cmd = commands.find((c) => c.data.name === commandName) -// if (!cmd) { -// console.log(`no command handler matches commandName=${commandName}`) -// return; -// } -// cmd.execute({ interaction, workerUtils }) -// } else { -// // probably a ButtonInteraction -// console.log(interaction) -// } -// }); - - diff --git a/services/bot/src/old/loadCommands.ts b/services/bot/src/old/loadCommands.ts deleted file mode 100644 index 694b20a..0000000 --- a/services/bot/src/old/loadCommands.ts +++ /dev/null @@ -1,31 +0,0 @@ -import * as path from 'node:path'; -import * as fs from 'node:fs'; -import { dirname } from 'node:path'; -import { fileURLToPath } from 'url'; -const __dirname = dirname(fileURLToPath(import.meta.url)); - - -export default async function loadCommands(): Promise { - const commands: any[] = []; - // console.log('Grab all the command folders from the commands directory you created earlier') - const foldersPath = path.join(__dirname, 'commands'); - const commandFolders = fs.readdirSync(foldersPath); - - for (const folder of commandFolders) { - const commandsPath = path.join(foldersPath, folder); - const commandFiles = fs.readdirSync(commandsPath).filter(file => file.endsWith('.ts') || file.endsWith('.js')); - console.log(`commandFiles=${commandFiles}`); - // console.log(`Grab the SlashCommandBuilder#toJSON() output of each command's data for deployment`) - for (const file of commandFiles) { - const filePath = path.join(commandsPath, file); - const command = (await import(filePath)).default; - // console.log(command) - if (command?.data && command?.execute) { - commands.push(command); - } else { - console.log(`[WARNING] The command at ${filePath} is missing a required "data" or "execute" property.`); - } - } - } - return commands; -} \ No newline at end of file diff --git a/services/bot/src/old/loadEvents.ts b/services/bot/src/old/loadEvents.ts deleted file mode 100644 index 9a51985..0000000 --- a/services/bot/src/old/loadEvents.ts +++ /dev/null @@ -1,23 +0,0 @@ -import * as path from 'node:path'; -import * as fs from 'node:fs'; -import { dirname } from 'node:path'; -import { fileURLToPath } from 'url'; -import type { Client } from 'discord.js'; -import type { WorkerUtils } from 'graphile-worker'; -const __dirname = dirname(fileURLToPath(import.meta.url)); - -export default async function loadEvents(client: Client, workerUtils: WorkerUtils) { - console.log(`loading events`); - const eventsPath = path.join(__dirname, 'events'); - const eventFiles = fs.readdirSync(eventsPath).filter(file => file.endsWith('.ts') || file.endsWith('.js')); - console.log(`eventFiles=${eventFiles}`); - for (const file of eventFiles) { - const filePath = path.join(eventsPath, file); - const event = (await import(filePath)).default; - if (event.once) { - client.once(event.name, (...args) => event.execute(...args, workerUtils)); - } else { - client.on(event.name, (...args) => event.execute(...args, workerUtils)); - } - } -} \ No newline at end of file diff --git a/services/bot/src/old/messageReactionAdd.ts b/services/bot/src/old/messageReactionAdd.ts deleted file mode 100644 index 7621b45..0000000 --- a/services/bot/src/old/messageReactionAdd.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Client, Events, MessageReaction, User, type Interaction } from 'discord.js'; - -export default { - name: Events.MessageReactionAdd, - once: false, - async execute(reaction: MessageReaction, user: User) { - // When a reaction is received, check if the structure is partial - if (reaction.partial) { - // If the message this reaction belongs to was removed, the fetching might result in an API error which should be handled - try { - await reaction.fetch(); - } catch (error) { - console.error('Something went wrong when fetching the message:', error); - // Return as `reaction.message.author` may be undefined/null - return; - } - } - - // Now the message has been cached and is fully available - console.log(`${reaction.message.author}'s message "${reaction.message.content}" gained a reaction!`); - // The reaction is now also fully available and the properties will be reflected accurately: - console.log(`${reaction.count} user(s) have given the same reaction to this message!`); - } -} \ No newline at end of file diff --git a/services/bot/src/old/register-commands.ts b/services/bot/src/old/register-commands.ts deleted file mode 100644 index b636f91..0000000 --- a/services/bot/src/old/register-commands.ts +++ /dev/null @@ -1,12 +0,0 @@ -import 'dotenv/config' -import { bot } from '../index.js' -import donger from '../commands/donger.js' -import record from '../commands/record.js' - -const guildId = process.env.DISCORD_GUILD_ID! -const commands = [ - donger, - record -] - -await bot.rest.upsertGuildApplicationCommands(guildId, commands) \ No newline at end of file diff --git a/services/bot/src/old/utilities/donger.ts b/services/bot/src/old/utilities/donger.ts deleted file mode 100644 index e86a524..0000000 --- a/services/bot/src/old/utilities/donger.ts +++ /dev/null @@ -1,45 +0,0 @@ -import { type ChatInputCommandInteraction, SlashCommandBuilder, Message } from 'discord.js'; - - -const dongers: string[] = [ - '( ͡ᵔ ͜ʖ ͡ᵔ )', - '¯\_(ツ)_/¯', - '(๑>ᴗ<๑)', - '(̿▀̿ ̿Ĺ̯̿̿▀̿ ̿)', - '( ͡° ͜ʖ ͡°)', - '٩(͡๏̯͡๏)۶', - 'ლ(´◉❥◉`ლ)', - '( ゚Д゚)', - 'ԅ( ͒ ۝ ͒ )ᕤ', - '( ͡ᵔ ͜ʖ ͡°)', - '( ͠° ͟ʖ ͡°)╭∩╮', - '༼ つ ❦౪❦ ༽つ', - '( ͡↑ ͜ʖ ͡↑)', - '(ভ_ ভ) ރ // ┊ \\', - 'ヽ(⌐□益□)ノ', - '༼ つ ◕‿◕ ༽つ', - 'ヽ(⚆෴⚆)ノ', - '(つ .•́ _ʖ •̀.)つ', - '༼⌐■ل͟■༽', - '┬─┬ノ( ͡° ͜ʖ ͡°ノ)', - '༼⁰o⁰;༽꒳ᵒ꒳ᵎᵎᵎ', - '( -_・) ▄︻̷̿┻̿═━一', - '【 º ᗜ º 】', - 'ᕦ(✧╭╮✧)ᕥ', - '┗( T﹏T )┛', - '(Φ ᆺ Φ)', - '(TдT)', - '☞(◉▽◉)☞' - ]; - - -export default { - data: new SlashCommandBuilder() - .setName('donger') - .setDescription('Replies with a free donger!'), - async execute({ interaction }: { interaction: ChatInputCommandInteraction}): Promise { - await interaction.reply({ - content: dongers[Math.floor(Math.random()*dongers.length)] - }); - }, -}; diff --git a/services/bot/src/old/utilities/record.ts b/services/bot/src/old/utilities/record.ts deleted file mode 100644 index 0530b49..0000000 --- a/services/bot/src/old/utilities/record.ts +++ /dev/null @@ -1,144 +0,0 @@ - -import type { ExecuteArguments } from '../../index.js'; - - -if (!process.env.AUTOMATION_USER_JWT) throw new Error(`AUTOMATION_USER_JWT was missing from env`); - -export default { - data: new SlashCommandBuilder() - .setName('record') - .setDescription('Record a livestream.') - .addStringOption((option) => - option.setName('url') - .setMaxLength(1024) - .setDescription('The channel URL to record') - .setRequired(true) - ), - async execute({ interaction, workerUtils }: ExecuteArguments): Promise { - const url = interaction.options.getString('url') - - // const row = new ActionRowBuilder() - // .addComponents(component); - - - // { - // content: `Button`, - // components: [ - // new ActionRowBuilder().addComponents([ - // new ButtonBuilder() - // .setCustomId('click/12345') - // .setLabel('LABEL') - // .setStyle(ButtonStyle.Primary) - // ]) - // ] - - // cols can be 5 high - // rows can be 5 wide - const statusEmbed = new EmbedBuilder() - .setTitle('Pending') - .setDescription('Waiting for a worker to accept the job.') - .setColor(2326507) - - const buttonRow = new ActionRowBuilder() - .addComponents([ - new ButtonBuilder() - .setCustomId('stop') - .setLabel('Stop Recording') - .setEmoji('🛑') - .setStyle(ButtonStyle.Danger), - ]); - - // const embed = new EmbedBuilder().setTitle('Attachments'); - - - const idk = await interaction.reply({ - content: `/record ${url}`, - embeds: [ - statusEmbed - ], - components: [ - buttonRow - ] - }); - - // console.log('the following is idk, the return value from interaction.reply') - // console.log(idk) - - const message = await idk.fetch() - const discordMessageId = message.id - await workerUtils.addJob('start_recording', { url, discordMessageId }, { maxAttempts: 3 }) - - }, -}; - - -/** -{ - "content": "https://chaturbate.com/projektmelody", - "tts": false, - "embeds": [ - { - "id": 652627557, - "title": "Pending", - "description": "Waiting for a worker to accept the job.", - "color": 2326507, - "fields": [] - }, - { - "id": 88893690, - "title": "Recording", - "description": "The stream is being recorded.", - "color": 392960, - "fields": [] - }, - { - "id": 118185075, - "title": "Aborted", - "description": "The recording was stopped by the user.", - "color": 8289651, - "fields": [] - }, - { - "id": 954884517, - "title": "Ended", - "description": "The recording has stopped.", - "color": 10855845, - "fields": [] - }, - { - "id": 64407340, - "description": "", - "fields": [], - "image": { - "url": "https://futureporn-b2.b-cdn.net/ti8ht9bgwj6k783j7hglfg8j_projektmelody-chaturbate-2024-07-18.png" - } - } - ], - "components": [ - { - "id": 300630266, - "type": 1, - "components": [ - { - "id": 320918638, - "type": 2, - "style": 4, - "label": "Stop Recording", - "action_set_id": "407606538", - "emoji": { - "name": "🛑", - "animated": false - } - } - ] - } - ], - "actions": { - "407606538": { - "actions": [] - } - }, - "username": "@futureporn/capture", - "avatar_url": "https://cdn.discordapp.com/avatars/1081818302344597506/93892e06c2f94c3ef1043732a49856db.webp?size=128" -} -*/ \ No newline at end of file diff --git a/services/bot/src/old/utilities/simEmail.ts b/services/bot/src/old/utilities/simEmail.ts deleted file mode 100644 index 86e1379..0000000 --- a/services/bot/src/old/utilities/simEmail.ts +++ /dev/null @@ -1,14 +0,0 @@ -import { type ChatInputCommandInteraction, SlashCommandBuilder } from 'discord.js'; - - -export default { - data: new SlashCommandBuilder() - .setName('sim-email') - .setDescription('Simulate an incoming platform notification e-mail'), - async execute(interaction: ChatInputCommandInteraction): Promise { - - await interaction.reply({ - content: 'testing 123 this is sim-email (simEmail.ts)' - }); - }, -}; diff --git a/services/bot/src/tasks/expire_stream_recordings.ts b/services/bot/src/tasks/expire_stream_recordings.ts deleted file mode 100644 index dfac5c0..0000000 --- a/services/bot/src/tasks/expire_stream_recordings.ts +++ /dev/null @@ -1,72 +0,0 @@ -import type { Task, Helpers } from "graphile-worker" -import { sub } from 'date-fns' -import type { RecordingRecord, Stream } from "@futureporn/types" -import qs from 'qs' -import fetch from 'node-fetch' -import { configs } from '../config.ts' - -interface Payload { - idle_minutes: number; -} - -function assertPayload(payload: any): asserts payload is Payload { - if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); - if (!payload.idle_minutes) throw new Error(`idle_minutes was absent in the payload`); - if (typeof payload.idle_minutes !== 'number') throw new Error(`idle_minutes parameter was not a number`); -} - -export const expire_stream_recordings: Task = async function (payload: unknown, helpers: Helpers) { - assertPayload(payload) - const { idle_minutes } = payload - helpers.logger.info(`expire_stream_recordings has begun. Expring 'recording' and 'pending' streams that haven't been updated in ${idle_minutes} minutes.`) - - const url = 'http://postgrest.futureporn.svc.cluster.local:9000/streams' - let streams: Stream[] = [] - - try { - // 1. identify and update stalled /streams - // Any streams that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled. - const timestamp = sub(new Date(), { minutes: idle_minutes }).toISOString() - const queryOptions = { - updated_at: `lt.${timestamp}`, - or: '(status.eq.pending_recording,status.eq.recording)' - } - const updatePayload = { - updated_at: new Date().toISOString(), - status: 'stalled' - } - helpers.logger.info(JSON.stringify(updatePayload)) - const query = qs.stringify(queryOptions) - const res = await fetch (`${url}?${query}`, { - method: 'PATCH', - headers: { - 'Content-Type': 'application/json', - 'Authorization': `Bearer ${configs.automationUserJwt}`, - 'Prefer': 'return=headers-only' - }, - body: JSON.stringify(updatePayload) - }) - if (!res.ok) { - const body = await res.text() - helpers.logger.info(JSON.stringify(res.headers)) - helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`) - helpers.logger.error(body) - return; - } - - const body = await res.text() - helpers.logger.info('body as follows') - helpers.logger.info(body) - - } catch (e: any) { - if (e instanceof Error) { - helpers.logger.error(`hi there we encountered an error while fetching /streams`) - helpers.logger.error(e.message) - } else { - helpers.logger.error(e) - } - - } -} - -export default expire_stream_recordings \ No newline at end of file diff --git a/services/bot/src/tasks/update_discord_message.ts b/services/bot/src/tasks/update_discord_message.ts index 012e53e..77407dc 100644 --- a/services/bot/src/tasks/update_discord_message.ts +++ b/services/bot/src/tasks/update_discord_message.ts @@ -1,7 +1,7 @@ import 'dotenv/config' -import type { Status } from '@futureporn/types' +import type { Status, Stream, Segment } from '@futureporn/types' import { type Task, type Helpers } from 'graphile-worker' -import { add } from 'date-fns' +import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns' import prettyBytes from 'pretty-bytes' import { EmbedsBuilder, @@ -9,14 +9,14 @@ import { type ActionRow, MessageComponentTypes, type ButtonComponent, - type InputTextComponent, type EditMessage, - type Message, - type Embed } from '@discordeno/bot' import { bot } from '../bot.ts' import { configs } from '../config.ts' +const yeahEmojiId = BigInt('1253191939461873756') + + interface Payload { stream_id: number; } @@ -31,25 +31,17 @@ function assertPayload(payload: any): asserts payload is Payload { -async function editDiscordMessage({ helpers, streamStatus, discordMessageId, url, fileSize, streamId }: { streamId: number, fileSize: number, url: string, helpers: Helpers, streamStatus: Status, discordMessageId: string }) { +async function editDiscordMessage({ helpers, stream }: { stream: Stream, helpers: Helpers }) { + const discordMessageId = stream.discord_message_id if (!discordMessageId) throw new Error(`discordMessageId was missing!`); if (typeof discordMessageId !== 'string') throw new Error(`discordMessageId was not a string!`); - - // const { captureJobId } = job.data - helpers.logger.info(`editDiscordMessage has begun with discordMessageId=${discordMessageId}, streamStatus=${streamStatus}`) - - - // const guild = await bot.cache.guilds.get(BigInt(configs.discordGuildId)) - // const channel = guild?.channels.get(BigInt(configs.discordChannelId)) - - // // const channel = await bot.cache.channels.get() - // console.log('channel as follows') - // console.log(channel) const channelId = BigInt(configs.discordChannelId) + const updatedMessage: EditMessage = { - embeds: getStatusEmbed({ streamStatus, fileSize, streamId, url }), + embeds: getEmbeds(stream), + components: getButtonRow(stream.status) } bot.helpers.editMessage(channelId, discordMessageId, updatedMessage) @@ -59,7 +51,7 @@ async function editDiscordMessage({ helpers, streamStatus, discordMessageId, url async function getStreamFromDatabase(streamId: number) { - const res = await fetch(`${process.env.POSTGREST_URL}/streams?select=*,segment:segments(*)&id=eq.${streamId}`) + const res = await fetch(`${process.env.POSTGREST_URL}/streams?select=*,segments(*)&id=eq.${streamId}`) if (!res.ok) { throw new Error(`failed fetching stream ${streamId}. status=${res.status}, statusText=${res.statusText}`) } @@ -74,7 +66,6 @@ async function getStreamFromDatabase(streamId: number) { * the most up-to-date status information from the database * * Sometimes the update is changing the state, one of Pending|Recording|Aborted|Ended. - * Sometimes the update is updating the Filesize of the recording in-progress * Sometimes the update is adding a thumbnail image to the message */ export const update_discord_message: Task = async function (payload, helpers: Helpers) { @@ -82,113 +73,120 @@ export const update_discord_message: Task = async function (payload, helpers: He assertPayload(payload) const { stream_id } = payload const streamId = stream_id - helpers.logger.info(`update_discord_message() with streamId=${streamId}`) + const stream = await getStreamFromDatabase(streamId) - const { discord_message_id, status, file_size, url } = stream - const streamStatus = status - const discordMessageId = discord_message_id - const fileSize = file_size - editDiscordMessage({ helpers, streamStatus, discordMessageId, url, fileSize, streamId }) - // schedule the next update 10s from now, but only if the recording is still happening - if (streamStatus !== 'ended') { - const runAt = add(new Date(), { seconds: 10 }) - const streamId = stream.id - await helpers.addJob('update_discord_message', { streamId }, { jobKey: `stream_${streamId}_update_discord_message`, maxAttempts: 3, runAt }) - } + // helpers.logger.info(`update_discord_message with streamId=${streamId}. stream=${JSON.stringify(stream)}`) + editDiscordMessage({ helpers, stream }) } catch (e) { helpers.logger.error(`caught an error during update_discord_message. e=${e}`) } } -function getStatusEmbed({ - streamStatus, streamId, fileSize, url -}: { fileSize: number, streamStatus: Status, streamId: number, url: string }) { + +function getEmbeds(stream: Stream) { + const streamId = stream.id + const url = stream.url + const segments = stream?.segments + const status = stream.status const embeds = new EmbedsBuilder() .setTitle(`Stream ${streamId}`) .setFields([ - { name: 'Status', value: streamStatus.charAt(0).toUpperCase()+streamStatus.slice(1), inline: true }, - { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, + { name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true }, + // { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo { name: 'URL', value: url, inline: false }, ]) - if (streamStatus === 'pending') { + if (status === 'pending_recording') { embeds .setDescription("Waiting for a worker to accept the job.") .setColor(2326507) - } else if (streamStatus === 'recording') { + } else if (status === 'recording') { embeds .setDescription('The stream is being recorded.') .setColor(392960) - } else if (streamStatus === 'aborted') { + } else if (status === 'aborted') { embeds .setDescription("The recording was stopped by the user.") .setColor(8289651) - } else if (streamStatus === 'finished') { + } else if (status === 'finished') { embeds .setDescription("The recording has ended nominally.") .setColor(10855845) - } else if (streamStatus === 'failed') { + } else if (status === 'failed') { embeds .setDescription("The recording has ended abnorminally.") .setColor(8289651) - } else if (streamStatus === 'stalled') { + } else if (status === 'stalled') { embeds .setDescription("We have not received a progress update in the past two minutes.") .setColor(8289651) } else { embeds - .setDescription(`The recording is in an unknown state? (streamStatus=${streamStatus} this is a bug.)`) + .setDescription(`The recording is in an unknown state? (streamStatus=${status} this is a bug.)`) .setColor(10855845) } + + // Add an Embed for each segment + if (segments) { + const getDuration = (s: Segment) => formatDuration(intervalToDuration({ start: s.created_at, end: s.updated_at })) + embeds.newEmbed() + .setTitle(`Recording Segments`) + .setFields(segments.map((s, i) => ( + { + name: `Segment ${i+1}`, + value: `${getDuration(s)} (${prettyBytes(s.bytes)})`, + inline: false + } + ))) + } return embeds } -function getButtonRow(streamStatus: Status): ActionRow { +function getButtonRow(streamStatus: Status): ActionRow[] { const components: ButtonComponent[] = [] - if (streamStatus === 'pending' || streamStatus === 'recording') { - const stopButton: ButtonComponent = { - type: MessageComponentTypes.Button, - customId: 'stop', - label: 'Cancel', - style: ButtonStyles.Danger - } - components.push(stopButton) + const yeahButton: ButtonComponent = { + type: MessageComponentTypes.Button, + customId: 'yeah', + label: "Yeah!", + emoji: { + id: yeahEmojiId + }, + style: ButtonStyles.Success + } + const processButton: ButtonComponent = { + type: MessageComponentTypes.Button, + customId: 'process', + label: 'Process Recording', + style: ButtonStyles.Success + } + const cancelButton: ButtonComponent = { + type: MessageComponentTypes.Button, + customId: 'cancel', + label: 'Cancel', + style: ButtonStyles.Danger + } + + const retryButton: ButtonComponent = { + type: MessageComponentTypes.Button, + customId: 'record', + label: 'Retry Recording', + style: ButtonStyles.Secondary + } + + + if (streamStatus === 'pending_recording' || streamStatus === 'recording') { + components.push(cancelButton) + components.push(processButton) // @todo this is only for testing. normally the process button is hidden until recording completes. + components.push(yeahButton) // @todo this is only for testing. normally the process button is hidden until recording completes. } else if (streamStatus === 'aborted') { - const retryButton: ButtonComponent = { - type: MessageComponentTypes.Button, - customId: 'retry', - label: 'Retry Recording', - emoji: { - name: 'retry' - }, - style: ButtonStyles.Secondary - } components.push(retryButton) } else if (streamStatus === 'finished') { - const downloadButton: ButtonComponent = { - type: MessageComponentTypes.Button, - customId: 'download', - label: 'Download Recording', - emoji: { - id: BigInt('1253191939461873756') - }, - style: ButtonStyles.Success - } - components.push(downloadButton) + components.push(processButton) } else { - const unknownButton: ButtonComponent = { - type: MessageComponentTypes.Button, - customId: 'unknown', - label: 'Unknown Status', - emoji: { - name: 'thinking' - }, - style: ButtonStyles.Primary - } - components.push(unknownButton) + components.push(retryButton) } @@ -197,7 +195,7 @@ function getButtonRow(streamStatus: Status): ActionRow { components: components as [ButtonComponent] } - return actionRow + return [actionRow] } diff --git a/services/bot/src/tasks/update_stream_statuses.ts b/services/bot/src/tasks/update_stream_statuses.ts new file mode 100644 index 0000000..4f8c66c --- /dev/null +++ b/services/bot/src/tasks/update_stream_statuses.ts @@ -0,0 +1,123 @@ +import type { Task, Helpers } from "graphile-worker" +import { sub } from 'date-fns' +import type { Status } from "@futureporn/types" +import qs from 'qs' +import fetch from 'node-fetch' +import { configs } from '../config.ts' + +interface Payload { + stalled_minutes: number; +} + +function assertPayload(payload: any): asserts payload is Payload { + if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); + if (!payload.stalled_minutes) throw new Error(`stalled_minutes was absent in the payload`); + if (typeof payload.stalled_minutes !== 'number') throw new Error(`stalled_minutes parameter was not a number`); +} + +async function updateStalledStreams({ + helpers, + stalled_minutes, + url +}: { + helpers: Helpers, + stalled_minutes: number, + url: string +}) { + + // 1. identify and update stalled /streams + // Any streams that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled. + const timestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString() + const queryOptions = { + updated_at: `lt.${timestamp}`, + or: '(status.eq.pending_recording,status.eq.recording)' + } + const updatePayload = { + updated_at: new Date().toISOString(), + status: 'stalled' as Status + } + // helpers.logger.info(JSON.stringify(updatePayload)) + const query = qs.stringify(queryOptions) + const res = await fetch (`${url}?${query}`, { + method: 'PATCH', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${configs.automationUserJwt}`, + 'Prefer': 'return=headers-only' + }, + body: JSON.stringify(updatePayload) + }) + if (!res.ok) { + const body = await res.text() + helpers.logger.info(JSON.stringify(res.headers)) + helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`) + helpers.logger.error(body) + return; + } + +} + +async function updateRecordingStreams({ + helpers, + url +}: { + helpers: Helpers, + url: string +}) { + + // identify and update recording /streams + // Any streams that has a segment that was updated within the past 1 minutes is considered recording + const timestamp = sub(new Date(), { minutes: 1 }).toISOString() + const queryOptions = { + select: 'status,id,segments!inner(updated_at)', + 'segments.updated_at': `lt.${timestamp}`, + or: '(status.eq.pending_recording,status.eq.recording)', + } + const updatePayload = { + status: 'recording' + } + // helpers.logger.info(JSON.stringify(updatePayload)) + const query = qs.stringify(queryOptions) + const options = { + method: 'PATCH', + headers: { + 'Content-Type': 'application/json', + 'Authorization': `Bearer ${configs.automationUserJwt}`, + 'Prefer': 'return=headers-only' + }, + body: JSON.stringify(updatePayload) + } + const res = await fetch (`${url}?${query}`, options) + if (!res.ok) { + const body = await res.text() + helpers.logger.info(JSON.stringify(res.headers)) + helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`) + helpers.logger.error(body) + return; + } + +} + +export const update_stream_statuses: Task = async function (payload: unknown, helpers: Helpers) { + assertPayload(payload) + const { stalled_minutes } = payload + // helpers.logger.info(`update_stream_statuses has begun.`) + + const url = 'http://postgrest.futureporn.svc.cluster.local:9000/streams' + + try { + // await updateStalledStreams({ helpers, url, stalled_minutes }) + await updateRecordingStreams({ helpers, url }) + + } catch (e: any) { + if (e instanceof Error) { + helpers.logger.error(`hi there we encountered an error while fetching /streams`) + helpers.logger.error(e.message) + } else { + helpers.logger.error(e) + } + + } +} + +export default update_stream_statuses \ No newline at end of file diff --git a/services/capture/package.json b/services/capture/package.json index b72df9a..da74a27 100644 --- a/services/capture/package.json +++ b/services/capture/package.json @@ -31,6 +31,7 @@ "@types/mocha": "^10.0.7", "@types/qs": "^6.9.15", "date-fns": "^3.6.0", + "discord.js": "^14.15.3", "diskusage": "^1.2.0", "dotenv": "^16.4.5", "execa": "^6.1.0", diff --git a/services/capture/pnpm-lock.yaml b/services/capture/pnpm-lock.yaml index cad0018..8786e41 100644 --- a/services/capture/pnpm-lock.yaml +++ b/services/capture/pnpm-lock.yaml @@ -47,6 +47,9 @@ importers: date-fns: specifier: ^3.6.0 version: 3.6.0 + discord.js: + specifier: ^14.15.3 + version: 14.15.3 diskusage: specifier: ^1.2.0 version: 1.2.0 @@ -391,6 +394,34 @@ packages: '@dabh/diagnostics@2.0.3': resolution: {integrity: sha512-hrlQOIi7hAfzsMqlGSFyVucrx38O+j6wiGOf//H2ecvIEqYN4ADBSS2iLMh5UFyDunCNniUIPk/q3riFv45xRA==} + '@discordjs/builders@1.8.2': + resolution: {integrity: sha512-6wvG3QaCjtMu0xnle4SoOIeFB4y6fKMN6WZfy3BMKJdQQtPLik8KGzDwBVL/+wTtcE/ZlFjgEk74GublyEVZ7g==} + engines: {node: '>=16.11.0'} + + '@discordjs/collection@1.5.3': + resolution: {integrity: sha512-SVb428OMd3WO1paV3rm6tSjM4wC+Kecaa1EUGX7vc6/fddvw/6lg90z4QtCqm21zvVe92vMMDt9+DkIvjXImQQ==} + engines: {node: '>=16.11.0'} + + '@discordjs/collection@2.1.0': + resolution: {integrity: sha512-mLcTACtXUuVgutoznkh6hS3UFqYirDYAg5Dc1m8xn6OvPjetnUlf/xjtqnnc47OwWdaoCQnHmHh9KofhD6uRqw==} + engines: {node: '>=18'} + + '@discordjs/formatters@0.4.0': + resolution: {integrity: sha512-fJ06TLC1NiruF35470q3Nr1bi95BdvKFAF+T5bNfZJ4bNdqZ3VZ+Ttg6SThqTxm6qumSG3choxLBHMC69WXNXQ==} + engines: {node: '>=16.11.0'} + + '@discordjs/rest@2.3.0': + resolution: {integrity: sha512-C1kAJK8aSYRv3ZwMG8cvrrW4GN0g5eMdP8AuN8ODH5DyOCbHgJspze1my3xHOAgwLJdKUbWNVyAeJ9cEdduqIg==} + engines: {node: '>=16.11.0'} + + '@discordjs/util@1.1.0': + resolution: {integrity: sha512-IndcI5hzlNZ7GS96RV3Xw1R2kaDuXEp7tRIy/KlhidpN/BQ1qh1NZt3377dMLTa44xDUNKT7hnXkA/oUAzD/lg==} + engines: {node: '>=16.11.0'} + + '@discordjs/ws@1.1.1': + resolution: {integrity: sha512-PZ+vLpxGCRtmr2RMkqh8Zp+BenUaJqlS6xhgWKEZcgC/vfHLEzpHtKkB0sl3nZWpwtcKk6YWy+pU3okL2I97FA==} + engines: {node: '>=16.11.0'} + '@esbuild/aix-ppc64@0.21.5': resolution: {integrity: sha512-1SDgH6ZSPTlggy1yI6+Dbkiz8xzpHJEVAlF/AM1tHPLsf5STom9rwtjE4hKAF20FfXXNTFqEYXyJNWh1GiZedQ==} engines: {node: '>=12'} @@ -819,6 +850,18 @@ packages: cpu: [x64] os: [win32] + '@sapphire/async-queue@1.5.3': + resolution: {integrity: sha512-x7zadcfJGxFka1Q3f8gCts1F0xMwCKbZweM85xECGI0hBTeIZJGGCrHgLggihBoprlQ/hBmDR5LKfIPqnmHM3w==} + engines: {node: '>=v14.0.0', npm: '>=7.0.0'} + + '@sapphire/shapeshift@3.9.7': + resolution: {integrity: sha512-4It2mxPSr4OGn4HSQWGmhFMsNFGfFVhWeRPCRwbH972Ek2pzfGRZtb0pJ4Ze6oIzcyh2jw7nUDa6qGlWofgd9g==} + engines: {node: '>=v16'} + + '@sapphire/snowflake@3.5.3': + resolution: {integrity: sha512-jjmJywLAFoWeBi1W7994zZyiNWPIiqRRNAmSERxyg93xRGzNYvGjlZ0gR6x0F4gPRi2+0O6S71kOZYyr3cxaIQ==} + engines: {node: '>=v14.0.0', npm: '>=7.0.0'} + '@sinonjs/commons@2.0.0': resolution: {integrity: sha512-uLa0j859mMrg2slwQYdO/AkrOfmH+X6LTVmNTS9CqexuE2IvVORIkSpJLqePAbEnKJ77aMmCwr1NUZ57120Xcg==} @@ -1106,6 +1149,13 @@ packages: '@types/triple-beam@1.3.5': resolution: {integrity: sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==} + '@types/ws@8.5.12': + resolution: {integrity: sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==} + + '@vladfrangu/async_event_emitter@2.4.5': + resolution: {integrity: sha512-J7T3gUr3Wz0l7Ni1f9upgBZ7+J22/Q1B7dl0X6fG+fTsD+H+31DIosMHj4Um1dWQwqbcQ3oQf+YS2foYkDc9cQ==} + engines: {node: '>=v14.0.0', npm: '>=7.0.0'} + abort-controller@3.0.0: resolution: {integrity: sha512-h8lQ8tacZYnR3vNQTgibj+tODHI5/+l06Au2Pcriv/Gmet0eaj4TwWH41sO9wnHDiQsEj19q0drzdWdeAHtweg==} engines: {node: '>=6.5'} @@ -1484,6 +1534,13 @@ packages: resolution: {integrity: sha512-WkrWp9GR4KXfKGYzOLmTuGVi1UWFfws377n9cc55/tb6DuqyF6pcQ5AbiHEshaDpY9v6oaSr2XCDidGmMwdzIA==} engines: {node: '>=8'} + discord-api-types@0.37.83: + resolution: {integrity: sha512-urGGYeWtWNYMKnYlZnOnDHm8fVRffQs3U0SpE8RHeiuLKb/u92APS8HoQnPTFbnXmY1vVnXjXO4dOxcAn3J+DA==} + + discord.js@14.15.3: + resolution: {integrity: sha512-/UJDQO10VuU6wQPglA4kz2bw2ngeeSbogiIPx/TsnctfzV/tNf+q+i1HlgtX1OGpeOBpJH9erZQNO5oRM2uAtQ==} + engines: {node: '>=16.11.0'} + diskusage@1.2.0: resolution: {integrity: sha512-2u3OG3xuf5MFyzc4MctNRUKjjwK+UkovRYdD2ed/NZNZPrt0lqHnLKxGhlFVvAb4/oufIgQG3nWgwmeTbHOvXA==} @@ -2051,12 +2108,18 @@ packages: lodash.isarguments@3.1.0: resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.snakecase@4.1.1: + resolution: {integrity: sha512-QZ1d4xoBHYUeuouhEq3lk3Uq7ldgyFXGBhg04+oRLnIz8o9T65Eh+8YdroUwn846zchkA9yDsDl5CVVaV2nqYw==} + lodash.sortby@4.7.0: resolution: {integrity: sha512-HDWXG8isMntAyRF5vZ7xKuEvOhT4AhlRt/3czTSjvGUxjYCBVRQY48ViDHyfYz9VIoBkW4TMGQNapx+l3RUwdA==} lodash@4.1.0: resolution: {integrity: sha512-B9sgtKUlz0xe7lkYb80BcOpwwJJw5iOiz4HkBDzF0+i5nJLiwfBnL08m7bBkCOPBfi+0aqvrJDMdZDfAvs8vYg==} + lodash@4.17.21: + resolution: {integrity: sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==} + log-symbols@4.1.0: resolution: {integrity: sha512-8XPvpAA8uyhfteu8pIvQxpJZ7SYYdpUivZpGy6sFsBuKRY/7rQGavedeB8aK+Zkyq6upMFVL/9AW6vOYzfRyLg==} engines: {node: '>=10'} @@ -2075,6 +2138,9 @@ packages: resolution: {integrity: sha512-zobTr7akeGHnv7eBOXcRgMeCP6+uyYsczwmeRCauvpvaAltgNyTbLH/+VaEAPUeWBT+1GuNmz4wC/6jtQzbbVA==} engines: {node: '>=12'} + magic-bytes.js@1.10.0: + resolution: {integrity: sha512-/k20Lg2q8LE5xiaaSkMXk4sfvI+9EGEykFS4b0CHHGWqDYU0bGUFSwchNOMA56D7TCs9GwVTkqe9als1/ns8UQ==} + make-error@1.3.6: resolution: {integrity: sha512-s8UhlNe7vPKomQhC1qFelMokr/Sc3AgNbso3n74mVPA5LTZwkB9NlXf4XPamLxJE8h0gh73rM94xvwRT2CVInw==} @@ -2777,6 +2843,9 @@ packages: ts-interface-checker@0.1.13: resolution: {integrity: sha512-Y/arvbn+rrz3JCKl9C4kVNfTfSm2/mEp5FSz5EsZSANGPSlQrpRI5M4PKF+mJnE52jOO90PnPSc3Ur3bTQw0gA==} + ts-mixer@6.0.4: + resolution: {integrity: sha512-ufKpbmrugz5Aou4wcr5Wc1UUFWOLhq+Fm6qa6P0w0K5Qw2yhaUoiWszhCVuNQyNwrlGiscHOmqYoAox1PtvgjA==} + ts-node@10.9.2: resolution: {integrity: sha512-f0FFpIdcHgn8zcPSbf1dRevwt047YMnaiJM3u2w2RewrB+fob/zePZcrOyQoLMMO7aBIddLcQIEK5dYjkLnGrQ==} hasBin: true @@ -2791,6 +2860,9 @@ packages: '@swc/wasm': optional: true + tslib@2.6.2: + resolution: {integrity: sha512-AEYxH93jGFPn/a2iVAwW87VuUIkR1FVUKB77NwMF7nBTDkDrrT/Hpt/IrCJ0QXhW27jTBDcf5ZY7w6RiqTMw2Q==} + tslib@2.6.3: resolution: {integrity: sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ==} @@ -2867,6 +2939,10 @@ packages: undici-types@5.26.5: resolution: {integrity: sha512-JlCMO+ehdEIKqlFxk6IfVoAUVmgz7cU7zD/h9XZ0qzeosSHmUJVOzSQvvYSYWXkFXC+IfLKSIffhv0sVZup6pA==} + undici@6.13.0: + resolution: {integrity: sha512-Q2rtqmZWrbP8nePMq7mOJIN98M0fYvSgV89vwl/BQRT4mDOeY2GXZngfGpcBBhtky3woM7G24wZV3Q304Bv6cw==} + engines: {node: '>=18.0'} + universalify@0.2.0: resolution: {integrity: sha512-CJ1QgKmNg3CwvAv/kOFmtnEN05f0D/cn9QntgNOQlQF9dgvVTHj3t+8JPdjqawCHk7V/KA+fbUqzZ9XWhcqPUg==} engines: {node: '>= 4.0.0'} @@ -2950,6 +3026,18 @@ packages: wrappy@1.0.2: resolution: {integrity: sha512-l4Sp/DRseor9wL6EvV2+TuQn63dMkPjZ/sp9XkghTEbV9KlPS1xUsZ3u7/IQO4wxtcFB4bgpQPRcR3QCvezPcQ==} + ws@8.18.0: + resolution: {integrity: sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + xml2js@0.6.2: resolution: {integrity: sha512-T4rieHaC1EXcES0Kxxj4JWgaUQHDk+qwHcYOCFHfiwKz7tOVPLq7Hjq9dM1WCMhylqMEfP7hMcOIChvotiZegA==} engines: {node: '>=4.0.0'} @@ -3533,6 +3621,53 @@ snapshots: enabled: 2.0.0 kuler: 2.0.0 + '@discordjs/builders@1.8.2': + dependencies: + '@discordjs/formatters': 0.4.0 + '@discordjs/util': 1.1.0 + '@sapphire/shapeshift': 3.9.7 + discord-api-types: 0.37.83 + fast-deep-equal: 3.1.3 + ts-mixer: 6.0.4 + tslib: 2.6.3 + + '@discordjs/collection@1.5.3': {} + + '@discordjs/collection@2.1.0': {} + + '@discordjs/formatters@0.4.0': + dependencies: + discord-api-types: 0.37.83 + + '@discordjs/rest@2.3.0': + dependencies: + '@discordjs/collection': 2.1.0 + '@discordjs/util': 1.1.0 + '@sapphire/async-queue': 1.5.3 + '@sapphire/snowflake': 3.5.3 + '@vladfrangu/async_event_emitter': 2.4.5 + discord-api-types: 0.37.83 + magic-bytes.js: 1.10.0 + tslib: 2.6.3 + undici: 6.13.0 + + '@discordjs/util@1.1.0': {} + + '@discordjs/ws@1.1.1': + dependencies: + '@discordjs/collection': 2.1.0 + '@discordjs/rest': 2.3.0 + '@discordjs/util': 1.1.0 + '@sapphire/async-queue': 1.5.3 + '@types/ws': 8.5.12 + '@vladfrangu/async_event_emitter': 2.4.5 + discord-api-types: 0.37.83 + tslib: 2.6.3 + ws: 8.18.0 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + '@esbuild/aix-ppc64@0.21.5': optional: true @@ -3794,6 +3929,15 @@ snapshots: '@rollup/rollup-win32-x64-msvc@4.19.1': optional: true + '@sapphire/async-queue@1.5.3': {} + + '@sapphire/shapeshift@3.9.7': + dependencies: + fast-deep-equal: 3.1.3 + lodash: 4.17.21 + + '@sapphire/snowflake@3.5.3': {} + '@sinonjs/commons@2.0.0': dependencies: type-detect: 4.0.8 @@ -4212,6 +4356,12 @@ snapshots: '@types/triple-beam@1.3.5': {} + '@types/ws@8.5.12': + dependencies: + '@types/node': 20.14.13 + + '@vladfrangu/async_event_emitter@2.4.5': {} + abort-controller@3.0.0: dependencies: event-target-shim: 5.0.1 @@ -4610,6 +4760,26 @@ snapshots: dependencies: path-type: 4.0.0 + discord-api-types@0.37.83: {} + + discord.js@14.15.3: + dependencies: + '@discordjs/builders': 1.8.2 + '@discordjs/collection': 1.5.3 + '@discordjs/formatters': 0.4.0 + '@discordjs/rest': 2.3.0 + '@discordjs/util': 1.1.0 + '@discordjs/ws': 1.1.1 + '@sapphire/snowflake': 3.5.3 + discord-api-types: 0.37.83 + fast-deep-equal: 3.1.3 + lodash.snakecase: 4.1.1 + tslib: 2.6.2 + undici: 6.13.0 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + diskusage@1.2.0: dependencies: es6-promise: 4.2.8 @@ -5289,10 +5459,14 @@ snapshots: lodash.isarguments@3.1.0: {} + lodash.snakecase@4.1.1: {} + lodash.sortby@4.7.0: {} lodash@4.1.0: {} + lodash@4.17.21: {} + log-symbols@4.1.0: dependencies: chalk: 4.1.2 @@ -5315,6 +5489,8 @@ snapshots: luxon@3.4.4: {} + magic-bytes.js@1.10.0: {} + make-error@1.3.6: {} merge-stream@2.0.0: {} @@ -6051,6 +6227,8 @@ snapshots: ts-interface-checker@0.1.13: {} + ts-mixer@6.0.4: {} + ts-node@10.9.2(@types/node@20.14.13)(typescript@5.5.4): dependencies: '@cspotcode/source-map-support': 0.8.1 @@ -6069,6 +6247,8 @@ snapshots: v8-compile-cache-lib: 3.0.1 yn: 3.1.1 + tslib@2.6.2: {} + tslib@2.6.3: {} tsup@8.2.3(tsx@4.16.2)(typescript@5.5.4): @@ -6168,6 +6348,8 @@ snapshots: undici-types@5.26.5: {} + undici@6.13.0: {} + universalify@0.2.0: {} url-parse@1.5.10: @@ -6274,6 +6456,8 @@ snapshots: wrappy@1.0.2: {} + ws@8.18.0: {} + xml2js@0.6.2: dependencies: sax: 1.2.1 diff --git a/services/capture/src/Record.ts b/services/capture/src/Record.ts index 1c87aa1..f1ba5a3 100644 --- a/services/capture/src/Record.ts +++ b/services/capture/src/Record.ts @@ -151,14 +151,15 @@ export default class Record { console.log('parallelUploads3 is complete.') } catch (e) { + // if we got an abort error, e.name is not AbortError as expected. Instead, e.name is Error. + // so in order to catch AbortError, we don't even look there. instead, we check if our abortcontroller was aborted. + // in other words, `(e.name === 'AbortError')` will never be true. + if (this.abortSignal.aborted) return; + if (e instanceof Error) { - if (e.name === 'AbortError') { - console.error(`We got an error, AbortError which is something we know how to handle. we will NOT throw and instead return gracefully.`) - return - } else { - console.error(`We were uploading a file to S3 but then we encountered an error! ${JSON.stringify(e, null, 2)}`) - throw e - } + console.error(`We were uploading a file to S3 but then we encountered an exception!`) + console.error(e) + throw e } else { throw new Error(`error of some sort ${JSON.stringify(e, null, 2)}`) } diff --git a/services/capture/src/index.ts b/services/capture/src/index.ts index bfda862..9121f96 100644 --- a/services/capture/src/index.ts +++ b/services/capture/src/index.ts @@ -27,7 +27,9 @@ const preset: GraphileConfig.Preset = { }; -async function api() { + + +async function doRunApi() { if (!process.env.PORT) throw new Error('PORT is missing in env'); console.log(`api FUNCTION listening on PORT ${process.env.PORT}`) const PORT = parseInt(process.env.PORT!) @@ -51,18 +53,42 @@ async function api() { }) } -async function worker(workerUtils: WorkerUtils) { +async function doRunWorker(workerUtils: WorkerUtils) { + + let workerIds: string[] = [] + const runnerOptions: RunnerOptions = { preset, concurrency, taskDirectory: join(__dirname, 'tasks'), - // taskList: { - // 'record': record, - // } } - + + + const runner = await graphileRun(runnerOptions) if (!runner) throw new Error('failed to initialize graphile worker'); + + /** + * This is likely only relevant during development. + * if nodemon restarts us, we need to unlock the graphile-worker job so it gets retried immediately by another worker. + * + */ + runner.events.on('worker:create', ({ worker }) => { + // There is no way to get workerIds on demand when the SIGUSR2 comes in, so we log the IDs ahead of time. + workerIds.push(worker.workerId) + }) + process.on('SIGUSR2', async () => { + console.warn(`SIGUSR2 detected! ulocking ${workerIds.length} workers, workerIds=${workerIds}`) + await workerUtils.forceUnlockWorkers(workerIds) + process.kill(process.pid, 'SIGTERM'); + }) + runner.events.on("pool:gracefulShutdown", async ({ workerPool, message }) => { + const workerIds = workerPool._workers.map((w) => w.workerId) + console.warn(`gracefulShutdown detected. releasing job locks on ${workerIds.length} workers, workerIds=${workerIds}, message=${message}`); + await workerUtils.forceUnlockWorkers(workerIds) + }); + + await runner.promise } @@ -74,9 +100,9 @@ async function main() { console.log(`@futureporn/capture version ${version} (FUNCTION=${process.env.FUNCTION})`) if (process.env.FUNCTION === 'api') { - api() + doRunApi() } else if (process.env.FUNCTION === 'worker') { - worker(workerUtils) + doRunWorker(workerUtils) } else { throw new Error('process.env.FUNCTION must be either api or worker. got '+process.env.FUNCTION) } @@ -85,5 +111,5 @@ async function main() { main().catch((err) => { console.error('there was an error!') console.error(err); - process.exit(1); + process.exit(874); }); diff --git a/services/capture/src/tasks/record.ts b/services/capture/src/tasks/record.ts index 1c37604..6a447a4 100644 --- a/services/capture/src/tasks/record.ts +++ b/services/capture/src/tasks/record.ts @@ -121,6 +121,8 @@ interface Payload { stream_id: string; } + + function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload.url !== "string") throw new Error("invalid url"); @@ -141,14 +143,8 @@ async function getRecordInstance(url: string, segment_id: number, helpers: Helpe const inputStream = Record.getFFmpegStream({ url: playlistUrl }) const onProgress = (fileSize: number) => { updateDatabaseRecord({ segment_id, fileSize, helpers }) - .then((reee) => { - - helpers.logger.info(JSON.stringify(reee)) - return reee - }) .then(checkIfAborted) .then((isAborted) => { - helpers.logger.info(`isAborted=${isAborted}`) isAborted ? abortController.abort() : null }) .catch((e) => { @@ -316,9 +312,11 @@ const doRecordSegment = async function doRecordSegment(url: string, stream_id: s export const record: Task = async function (payload: unknown, helpers: Helpers) { + + assertPayload(payload) const { url, stream_id } = payload - const recordId = stream_id + const streamId = stream_id try { /** * We do an exponential backoff timer when we record. If the Record() instance throws an error, we try again after a delay. @@ -329,16 +327,31 @@ export const record: Task = async function (payload: unknown, helpers: Helpers) * @todo We must implement retrying at a higher level, and retry a few times to handle this type of corner-case. */ // await backOff(() => doRecordSegment(url, recordId, helpers)) - await doRecordSegment(url, recordId, helpers) + await doRecordSegment(url, streamId, helpers) } catch (e) { // await updateDatabaseRecord({ recordId: stream_id, recordingState: 'failed' }) helpers.logger.error(`caught an error during record Task`) if (e instanceof Error) { - helpers.logger.error(e.message) + helpers.logger.info(`error.name=${e.name}`) + if (e.name === 'RoomOfflineError') { + // If room is offline, we want to retry until graphile-worker retries expire. + // We don't want to swallow the error so we simply log the error then let the below throw re-throw the error + // graphile-worker will retry when we re-throw the error below. + helpers.logger.info(`Room is offline.`) + } else if (e.name === 'AbortError') { + // If the recording was aborted by an admin, we want graphile-worker to stop retrying the record job. + // We swallow the error and return in order to mark the job as succeeded. + helpers.logger.info(`>>> we got an AbortError so we are ending the record job.`) + return + } else { + helpers.logger.error(e.message) + } } else { helpers.logger.error(JSON.stringify(e)) } - // throw e // @todo uncomment this for production + // we throw the error which fails the graphile-worker job, thus causing graphile-worker to restart/retry the job. + helpers.logger.error(`we got an error during record Task so we throw and retry`) + throw e } } diff --git a/services/migrations/migrations/00030_update-update_discord_message.sql b/services/migrations/migrations/00030_update-update_discord_message.sql new file mode 100644 index 0000000..835cd63 --- /dev/null +++ b/services/migrations/migrations/00030_update-update_discord_message.sql @@ -0,0 +1,14 @@ +-- instead of using record_id, we need to use stream_id +DROP FUNCTION public.tg__update_discord_message CASCADE; + +CREATE FUNCTION public.tg__update_discord_message() RETURNS trigger + LANGUAGE plpgsql SECURITY DEFINER + SET search_path TO 'pg_catalog', 'public', 'pg_temp' + AS $$ + begin + PERFORM graphile_worker.add_job('update_discord_message', json_build_object( + 'stream_id', NEW.id + ), max_attempts := 3); + return NEW; + end; + $$; \ No newline at end of file diff --git a/services/migrations/migrations/00031_recreate-update_stream-trigger.sql b/services/migrations/migrations/00031_recreate-update_stream-trigger.sql new file mode 100644 index 0000000..ddb6957 --- /dev/null +++ b/services/migrations/migrations/00031_recreate-update_stream-trigger.sql @@ -0,0 +1,5 @@ +-- when a stream is updated, we add a job in graphile to update_discord_message +CREATE TRIGGER stream_update + AFTER UPDATE ON api.streams + FOR EACH ROW + EXECUTE PROCEDURE public.tg__update_discord_message('update_discord_message'); diff --git a/services/migrations/migrations/00032_update-stream-when-segment-updates.sql b/services/migrations/migrations/00032_update-stream-when-segment-updates.sql new file mode 100644 index 0000000..18578ff --- /dev/null +++ b/services/migrations/migrations/00032_update-stream-when-segment-updates.sql @@ -0,0 +1,23 @@ +-- in order for discord chatops messages to be updated when a segment is updated, +-- we need to have postgres update the related stream timestamp when a segment is updated. + +CREATE OR REPLACE FUNCTION update_stream_on_segment_update() +RETURNS TRIGGER AS $$ +BEGIN + UPDATE api.streams + SET updated_at = NOW() + WHERE id IN ( + SELECT stream_id + FROM segments_stream_links + WHERE segment_id = NEW.id + ); + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +CREATE TRIGGER trigger_update_stream + AFTER UPDATE ON api.segments + FOR EACH ROW + EXECUTE FUNCTION update_stream_on_segment_update();