switch chatops from streams to vods
ci / build (push) Failing after 0s Details

This commit is contained in:
CJ_Clippy 2024-08-31 02:42:28 -08:00
parent fdd295e2b8
commit a5e4fee3a3
70 changed files with 1321 additions and 2087 deletions

View File

@ -207,6 +207,7 @@ docker_build(
], ],
dockerfile='./dockerfiles/scout.dockerfile', dockerfile='./dockerfiles/scout.dockerfile',
target='dev', target='dev',
# target='prod',
live_update=[ live_update=[
sync('./services/scout', '/app/services/scout') sync('./services/scout', '/app/services/scout')
] ]

View File

@ -20,6 +20,8 @@ spec:
- name: bot - name: bot
image: "{{ .Values.bot.imageName }}" image: "{{ .Values.bot.imageName }}"
env: env:
- name: SCOUT_URL
value: "{{ .Values.scout.url }}"
- name: POSTGREST_URL - name: POSTGREST_URL
value: "{{ .Values.postgrest.url }}" value: "{{ .Values.postgrest.url }}"
- name: NODE_ENV - name: NODE_ENV

View File

@ -40,6 +40,8 @@ spec:
- name: capture-worker - name: capture-worker
image: "{{ .Values.capture.imageName }}" image: "{{ .Values.capture.imageName }}"
env: env:
- name: SCOUT_URL
value: "{{ .Values.scout.url }}"
- name: FUNCTION - name: FUNCTION
value: worker value: worker
- name: WORKER_CONNECTION_STRING - name: WORKER_CONNECTION_STRING

View File

@ -0,0 +1,79 @@
---
apiVersion: v1
kind: Service
metadata:
name: chihaya
namespace: futureporn
annotations:
external-dns.alpha.kubernetes.io/hostname: "{{ .Values.chihaya.hostname }}"
spec:
type: LoadBalancer
selector:
app: chihaya
ports:
- name: http
port: 80
targetPort: 80
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: chihaya
spec:
replicas: 1
selector:
matchLabels:
app: chihaya
template:
metadata:
labels:
app: chihaya
spec:
containers:
- name: chihaya
image: "quay.io/jzelinskie/chihaya-git:latest"
ports:
- containerPort: 80
---
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: chihaya-http
namespace: futureporn
spec:
entryPoints:
- web
routes:
- match: Host(`chihaya.fp.sbtp.xyz`)
kind: Rule
middlewares:
- name: redirect
namespace: futureporn
services:
- name: chihaya
port: web
---
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: chihaya-https
namespace: futureporn
annotations:
cert-manager.io/cluster-issuer: "{{ .Values.certManager.issuer }}"
spec:
entryPoints:
- websecure
routes:
- match: Host(`chihaya.fp.sbtp.xyz`)
kind: Rule
services:
- name: chihaya
namespace: futureporn
port: web
tls:
secretName: chihaya-tls

View File

@ -19,6 +19,9 @@ spec:
containers: containers:
- name: scout - name: scout
image: "{{ .Values.scout.imageName }}" image: "{{ .Values.scout.imageName }}"
ports:
- name: http
containerPort: {{ .Values.scout.port }}
env: env:
- name: POSTGREST_URL - name: POSTGREST_URL
value: "{{ .Values.postgrest.url }}" value: "{{ .Values.postgrest.url }}"
@ -47,3 +50,18 @@ spec:
memory: 1024Mi memory: 1024Mi
restartPolicy: Always restartPolicy: Always
---
apiVersion: v1
kind: Service
metadata:
name: scout
namespace: futureporn
spec:
type: ClusterIP
selector:
app: scout
ports:
- name: http
port: {{ .Values.scout.port }}
targetPort: http

View File

@ -72,10 +72,13 @@ bot:
discordGuildId: "1084674137391374338" discordGuildId: "1084674137391374338"
imageName: fp/bot imageName: fp/bot
replicas: 1 replicas: 1
chihaya:
hostname: tracker.fp.sbtp.xyz
scout: scout:
imageName: fp/scout imageName: fp/scout
replicas: 1 replicas: 1
port: 5134 port: 5134
url: http://scout.futureporn.svc.cluster.local:5134
postgrest: postgrest:
url: http://postgrest.futureporn.svc.cluster.local:9000 url: http://postgrest.futureporn.svc.cluster.local:9000
image: postgrest/postgrest image: postgrest/postgrest

View File

@ -2,7 +2,7 @@ FROM node:20 AS base
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENTRYPOINT ["pnpm"] ENTRYPOINT ["pnpm"]
FROM base AS install FROM base AS install

View File

@ -2,7 +2,7 @@ FROM node:20-alpine AS base
## Install dependencies only when needed ## Install dependencies only when needed
## Check https://github.com/nodejs/docker-node/tree/b4117f9333da4138b03a546ec926ef50a31506c3#nodealpine to understand why libc6-compat might be needed. ## Check https://github.com/nodejs/docker-node/tree/b4117f9333da4138b03a546ec926ef50a31506c3#nodealpine to understand why libc6-compat might be needed.
RUN apk add --no-cache libc6-compat RUN apk add --no-cache libc6-compat
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
## Enable `pnpm add --global` on Alpine Linux by setting ## Enable `pnpm add --global` on Alpine Linux by setting
## home location environment variable to a location already in $PATH ## home location environment variable to a location already in $PATH

View File

@ -10,7 +10,7 @@ ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
COPY --from=mwader/static-ffmpeg:7.0.2 /ffmpeg /usr/local/bin/ COPY --from=mwader/static-ffmpeg:7.0.2 /ffmpeg /usr/local/bin/
COPY --from=mwader/static-ffmpeg:7.0.2 /ffprobe /usr/local/bin/ COPY --from=mwader/static-ffmpeg:7.0.2 /ffprobe /usr/local/bin/
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENTRYPOINT ["pnpm"] ENTRYPOINT ["pnpm"]
FROM base AS install FROM base AS install

View File

@ -18,7 +18,7 @@ FROM node:20 AS base
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
FROM base AS build FROM base AS build
WORKDIR /app WORKDIR /app

View File

@ -4,7 +4,7 @@ FROM node:20.15 AS base
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
FROM base AS build FROM base AS build
# ENV NODE_ENV=development # ENV NODE_ENV=development

View File

@ -2,7 +2,7 @@ FROM node:20-alpine AS base
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
FROM base AS build FROM base AS build
COPY ./pnpm-workspace.yaml ./.npmrc . COPY ./pnpm-workspace.yaml ./.npmrc .

View File

@ -6,7 +6,7 @@ FROM node:20-slim AS base
FROM base AS deps FROM base AS deps
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
WORKDIR /app WORKDIR /app

View File

@ -2,7 +2,7 @@ FROM node:20 AS base
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
WORKDIR /app WORKDIR /app
RUN curl -s https://api.github.com/repos/yt-dlp/yt-dlp/releases/latest | grep "browser_download_url.*yt-dlp_linux\"" | cut -d : -f 2,3 | tr -d "\"" | wget -q -O /usr/local/bin/yt-dlp -i - && chmod +x /usr/local/bin/yt-dlp
## @important If pnpm is downloading node during the build, that's a bandwidth-expensive mistake. ## @important If pnpm is downloading node during the build, that's a bandwidth-expensive mistake.
## Node already exists in the docker image at /usr/local/bin/node. ## Node already exists in the docker image at /usr/local/bin/node.
## We should use the node version that exists in the docker image. ## We should use the node version that exists in the docker image.

View File

@ -1,6 +1,6 @@
FROM node:20 AS strapi FROM node:20 AS strapi
WORKDIR /usr/src/app/ WORKDIR /usr/src/app/
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
# ENV NODE_EXTRA_CA_CERTS ${NODE_EXTRA_CA_CERTS} # ENV NODE_EXTRA_CA_CERTS ${NODE_EXTRA_CA_CERTS}

View File

@ -2,7 +2,7 @@ FROM node:20-alpine3.18 AS base
## Installing libvips-dev for sharp Compatibility ## Installing libvips-dev for sharp Compatibility
## (only necessary for alpine docker images) ## (only necessary for alpine docker images)
RUN apk update && apk add --no-cache build-base gcc autoconf automake zlib-dev libpng-dev nasm bash vips-dev git RUN apk update && apk add --no-cache build-base gcc autoconf automake zlib-dev libpng-dev nasm bash vips-dev git
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
ENV PNPM_HOME="/pnpm" ENV PNPM_HOME="/pnpm"
ENV PATH="$PNPM_HOME:$PATH" ENV PATH="$PNPM_HOME:$PATH"
ARG NODE_ENV=development ARG NODE_ENV=development

View File

@ -28,7 +28,7 @@ RUN mkdir -p /app/packages/worker && mkdir -p /prod/worker
## Copy manfiests, lockfiles, and configs into docker context ## Copy manfiests, lockfiles, and configs into docker context
COPY package.json pnpm-lock.yaml .npmrc . COPY package.json pnpm-lock.yaml .npmrc .
RUN corepack enable && corepack prepare --activate RUN corepack enable && corepack prepare pnpm@9.6.0 --activate
# COPY ./packages/image/pnpm-lock.yaml ./packages/image/package.json ./packages/image/ # COPY ./packages/image/pnpm-lock.yaml ./packages/image/package.json ./packages/image/
# COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/ # COPY ./packages/storage/pnpm-lock.yaml ./packages/storage/package.json ./packages/storage/
# COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/ # COPY ./packages/types/pnpm-lock.yaml ./packages/types/package.json ./packages/types/

View File

@ -26,7 +26,7 @@ describe('image', function () {
this.timeout(1000*60*15) this.timeout(1000*60*15)
it('should accept a URL and return a path to image on disk', async function () { it('should accept a URL and return a path to image on disk', async function () {
// const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-06-25.mp4' // const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-06-25.mp4'
const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-08-10.mp4' const url = 'https://futureporn-b2.b-cdn.net/projektmelody-chaturbate-2024-08-31.mp4'
const imagePath = await getStoryboard(url) const imagePath = await getStoryboard(url)
expect(imagePath).to.match(/\.png/) expect(imagePath).to.match(/\.png/)
}) })

View File

@ -1,265 +0,0 @@
export = Futureporn;
export as namespace Futureporn;
declare namespace Futureporn {
type PlatformNotificationType = 'email' | 'manual' | 'twitter'
type ArchiveStatus = 'good' | 'issue' | 'missing'
type RecordingState = 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished'
type ProcessingState = 'processing'
type WaitingState = 'pending_recording'
type Status = Partial<WaitingState | ProcessingState | RecordingState>
interface S3File {
s3_key: string;
s3_id?: string;
bucket: string;
created_at?: Date;
updated_at?: Date;
}
interface VtuberResponse {
id: string;
}
interface VtuberRecord {
id: string;
display_name: string;
chaturbate: string;
twitter: string;
patreon: string;
twitch: string;
tiktok: string;
onlyfans: string;
youtube: string;
linktree: string;
carrd: string;
fansly: string;
pornhub: string;
discord: string;
reddit: string;
throne: string;
instagram: string;
facebook: string;
merch: string;
slug: string;
image: string;
theme_color: string;
image_blur: string;
fansly_id: string;
chaturbate_id: string;
twitter_id: string;
}
interface VodRecord {
id: string;
stream: Stream;
stream_id: string;
created_at: string;
updated_at: string;
published_at?: string;
title?: string;
date: string;
mux_asset: MuxAsset;
thumbnail?: S3File;
vtuber: string;
ipfs_cid?: string;
torrent?: string;
announce_title?: string;
announce_url?: string;
note?: string;
url: string
}
interface VodResponse {
id: string;
stream: Stream;
stream_id: string;
created_at: string;
updated_at: string;
published_at?: string;
title?: string;
date: string;
mux_asset: MuxAsset;
thumbnail?: S3File;
vtuber: Vtuber;
tags?: Tag[];
timestamps?: Timestamp[];
ipfs_cid?: string;
s3_file?: S3File;
torrent?: string;
announce_title?: string;
announce_url?: string;
uploader?: User;
note?: string;
url: string
}
interface Stream {
id: string;
url: string;
platform_notification_type: PlatformNotificationType;
discord_message_id: string;
date: Date;
created_at: Date;
updated_at: Date;
vtuber: string;
tweet: string;
vods?: Vod[];
archive_status: ArchiveStatus;
is_chaturbate_stream: Boolean;
is_fansly_stream: Boolean;
is_recording_aborted: Boolean;
status: Status;
segments?: Segment[]
}
interface RecordingRecord {
id: number;
recording_state: RecordingState;
file_size: number;
discord_message_id: string;
is_recording_aborted: boolean;
updated_at: Date;
created_at: Date;
}
interface Segment {
id: number;
s3_key: string;
s3_id: string;
bytes: number;
stream?: Stream[];
created_at: Date;
updated_at: Date;
}
interface IMuxAsset {
id: number;
attributes: {
playbackId: string;
assetId: string;
}
}
interface IPagination {
page: number;
pageSize: number;
pageCount: number;
total: number;
}
interface IMuxAssetResponse {
data: IMuxAsset;
meta: IMeta;
}
interface IMeta {
pagination: IPagination;
}
interface IPlatformNotification {
id: number;
attributes: {
source: string;
platform: string;
date: string;
date2: string;
vtuber: number;
}
}
interface IPlatformNotificationResponse {
data: IPlatformNotification;
meta: IMeta;
error?: any;
}
interface IStream {
id: number;
attributes: {
date: string;
date2: string;
archiveStatus: ArchiveStatus;
vods: IVodsResponse;
cuid: string;
vtuber: IVtuberResponse;
tweet: ITweetResponse;
isChaturbateStream: boolean;
isFanslyStream: boolean;
platformNotifications: IPlatformNotification[];
}
}
interface IStreamResponse {
data: IStream;
meta: IMeta;
error?: any;
}
interface IStreamsResponse {
data: IStream[];
meta: IMeta;
}
interface IVtuber {
id: number;
attributes: {
slug: string;
displayName: string;
chaturbate?: string;
twitter?: string;
patreon?: string;
twitch?: string;
tiktok?: string;
onlyfans?: string;
youtube?: string;
linktree?: string;
carrd?: string;
fansly?: string;
pornhub?: string;
discord?: string;
reddit?: string;
throne?: string;
instagram?: string;
facebook?: string;
merch?: string;
vods: IVod[];
description1: string;
description2?: string;
image: string;
imageBlur?: string;
themeColor: string;
fanslyId?: string;
chaturbateId?: string;
twitterId?: string;
}
}
interface IVtuberResponse {
data: IVtuber;
meta: IMeta;
}
interface IVtubersResponse {
data: IVtuber[];
meta: IMeta;
}
type NotificationData = {
isMatch: boolean;
url?: string;
platform?: string;
channel?: string;
displayName?: string;
date?: string;
userId?: string | null;
avatar?: string;
};
}

View File

@ -8,7 +8,7 @@
"clean": "rm -rf dist", "clean": "rm -rf dist",
"superclean": "rm -rf node_modules && rm -rf pnpm-lock.yaml && rm -rf dist" "superclean": "rm -rf node_modules && rm -rf pnpm-lock.yaml && rm -rf dist"
}, },
"main": "index.d.ts", "main": "src/index.ts",
"keywords": [], "keywords": [],
"author": "", "author": "",
"license": "Unlicense", "license": "Unlicense",

348
packages/types/src/index.ts Normal file
View File

@ -0,0 +1,348 @@
export type PlatformNotificationType = 'email' | 'manual' | 'twitter'
export type ArchiveStatus = 'good' | 'issue' | 'missing'
export type RecordingState = 'recording' | 'stalled' | 'aborted' | 'failed' | 'finished'
export type ProcessingState = 'processing'
export type WaitingState = 'pending_recording'
export type Status = Partial<WaitingState | ProcessingState | RecordingState>
export interface S3File {
s3_key: string;
s3_id?: string;
bucket: string;
created_at?: Date;
updated_at?: Date;
}
export interface VtuberResponse {
id: string;
}
export interface VtuberDataScrape {
display_name: string;
slug: string;
chaturbate_id?: string;
chaturbate?: string;
fansly_id?: string;
fansly?: string;
}
export interface ScoutResponse {
error: boolean;
message: string;
data: any;
}
export interface ChaturbateRoomSummary {
url: string;
name: string;
}
export interface ChaturbateRoom {
name: string;
username: string;
room_subject: string;
tags: string[];
is_new: boolean;
num_users: number;
num_followers: number;
current_show: string;
location?: string;
country?: string;
spoken_languages?: string;
display_name?: string;
birthday?: string;
is_hd: boolean;
age: number;
seconds_online: number;
image_url: string;
image_url_360x270: string;
chat_room_url_revshare: string;
iframe_embed_revshare: string;
chat_room_url: string;
iframe_embed: string;
slug: string;
}
export interface VtuberRecord {
id: string;
display_name: string;
chaturbate: string;
twitter: string;
patreon: string;
twitch: string;
tiktok: string;
onlyfans: string;
youtube: string;
linktree: string;
carrd: string;
fansly: string;
pornhub: string;
discord: string;
reddit: string;
throne: string;
instagram: string;
facebook: string;
merch: string;
slug: string;
image: string;
theme_color: string;
image_blur: string;
fansly_id: string;
chaturbate_id: string;
twitter_id: string;
}
export interface VodRecord {
id: string;
stream: Stream;
stream_id: string;
created_at: string;
updated_at: string;
published_at?: string;
title?: string;
date: string;
mux_asset: MuxAssetRecord;
thumbnail?: S3File;
vtuber: string;
ipfs_cid?: string;
torrent?: string;
announce_title?: string;
announce_url?: string;
note?: string;
url: string;
discord_message_id: string;
}
export interface TagRecord {
id: string
}
export interface Timestamp {
id: string
}
export interface User {
id: string
}
export interface VodResponse {
id: string;
stream: Stream;
stream_id: string;
created_at: string;
updated_at: string;
published_at?: string;
title?: string;
date: string;
mux_asset: MuxAssetRecord;
thumbnail?: S3File;
vtuber: VtuberRecord;
tags?: TagRecord[];
timestamps?: Timestamp[];
ipfs_cid?: string;
s3_file?: S3File;
torrent?: string;
announce_title?: string;
announce_url?: string;
uploader?: User;
note?: string;
url: string;
segments?: SegmentResponse[];
status: Status;
discord_message_id: string;
is_recording_aborted: boolean;
}
export interface Stream {
id: string;
url: string;
platform_notification_type: PlatformNotificationType;
discord_message_id: string;
date: Date;
created_at: Date;
updated_at: Date;
vtuber: string;
tweet: string;
vods?: Vod[];
archive_status: ArchiveStatus;
is_chaturbate_stream: Boolean;
is_fansly_stream: Boolean;
is_recording_aborted: Boolean;
status: Status;
segments?: SegmentResponse[]
}
export interface RecordingRecord {
id: number;
recording_state: RecordingState;
file_size: number;
discord_message_id: string;
is_recording_aborted: boolean;
updated_at: Date;
created_at: Date;
}
export interface SegmentResponse {
id: number;
s3_key: string;
s3_id: string;
bytes: number;
vod?: VodResponse;
created_at: string;
updated_at: string;
}
export interface MuxAssetRecord {
id: number;
playbackId: string;
assetId: string;
}
export interface IMuxAsset {
id: number;
attributes: {
playbackId: string;
assetId: string;
}
}
export interface IPagination {
page: number;
pageSize: number;
pageCount: number;
total: number;
}
export interface IMuxAssetResponse {
data: IMuxAsset;
meta: IMeta;
}
export interface IMeta {
pagination: IPagination;
}
export interface IPlatformNotification {
id: number;
attributes: {
source: string;
platform: string;
date: string;
date2: string;
vtuber: number;
}
}
export interface IPlatformNotificationResponse {
data: IPlatformNotification;
meta: IMeta;
error?: any;
}
export interface Vod {
id: string
}
export interface IVodsResponse {
id: string
}
export interface IVod {
id: string
}
export interface ITweetResponse {
id: string
}
export interface IStream {
id: number;
attributes: {
date: string;
date2: string;
archiveStatus: ArchiveStatus;
vods: IVodsResponse;
cuid: string;
vtuber: IVtuberResponse;
tweet: ITweetResponse;
isChaturbateStream: boolean;
isFanslyStream: boolean;
platformNotifications: IPlatformNotification[];
}
}
export interface IStreamResponse {
data: IStream;
meta: IMeta;
error?: any;
}
export interface IStreamsResponse {
data: IStream[];
meta: IMeta;
}
export interface IVtuber {
id: number;
attributes: {
slug: string;
displayName: string;
chaturbate?: string;
twitter?: string;
patreon?: string;
twitch?: string;
tiktok?: string;
onlyfans?: string;
youtube?: string;
linktree?: string;
carrd?: string;
fansly?: string;
pornhub?: string;
discord?: string;
reddit?: string;
throne?: string;
instagram?: string;
facebook?: string;
merch?: string;
vods: IVod[];
description1: string;
description2?: string;
image: string;
imageBlur?: string;
themeColor: string;
fanslyId?: string;
chaturbateId?: string;
twitterId?: string;
}
}
export interface IVtuberResponse {
data: IVtuber;
meta: IMeta;
}
export interface IVtubersResponse {
data: IVtuber[];
meta: IMeta;
}
export type NotificationData = {
isMatch: boolean;
url?: string;
platform?: string;
channel?: string;
displayName?: string;
date?: string;
userId?: string | null;
avatar?: string;
};

View File

@ -4,7 +4,7 @@
"esModuleInterop": true, "esModuleInterop": true,
"skipLibCheck": true, "skipLibCheck": true,
"target": "es2022", "target": "es2022",
"allowJs": true, "allowJs": false,
"resolveJsonModule": true, "resolveJsonModule": true,
"moduleDetection": "force", "moduleDetection": "force",
"isolatedModules": true, "isolatedModules": true,
@ -28,7 +28,7 @@
}, },
// Include the necessary files for your project // Include the necessary files for your project
"files": [ "files": [
"index.d.ts" "src/index.ts"
], ],
"exclude": [ "exclude": [
"node_modules" "node_modules"

View File

@ -13,6 +13,6 @@
# * * * * * task ?opts {payload} # * * * * * task ?opts {payload}
## every n minutes, we see which /vods are stale and we mark them as such. ## every 1 minutes, we see which /vods are stale and we mark them as such.
## this prevents stalled Record updates by marking stalled recordings as stopped ## this prevents stalled Record updates by marking stalled recordings as stopped
* * * * * update_svod_statuses ?max=1 { stalled_minutes:1 } * * * * * update_vod_statuses ?max=1 { stalled_minutes:1, finished_minutes:2 }

View File

@ -25,7 +25,6 @@
"dependencies": { "dependencies": {
"@discordeno/bot": "19.0.0-next.746f0a9", "@discordeno/bot": "19.0.0-next.746f0a9",
"@discordeno/rest": "19.0.0-next.b3a8c86", "@discordeno/rest": "19.0.0-next.b3a8c86",
"@futureporn/scout": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2", "@paralleldrive/cuid2": "^2.2.2",
"@types/node": "^22.2.0", "@types/node": "^22.2.0",
"@types/qs": "^6.9.15", "@types/qs": "^6.9.15",

View File

@ -14,9 +14,6 @@ importers:
'@discordeno/rest': '@discordeno/rest':
specifier: 19.0.0-next.b3a8c86 specifier: 19.0.0-next.b3a8c86
version: 19.0.0-next.b3a8c86 version: 19.0.0-next.b3a8c86
'@futureporn/scout':
specifier: workspace:^
version: link:../../packages/scout
'@paralleldrive/cuid2': '@paralleldrive/cuid2':
specifier: ^2.2.2 specifier: ^2.2.2
version: 2.2.2 version: 2.2.2
@ -91,6 +88,42 @@ importers:
specifier: ^5.5.4 specifier: ^5.5.4
version: 5.5.4 version: 5.5.4
../..: {}
../../packages/image: {}
../../packages/infra: {}
../../packages/meal: {}
../../packages/old: {}
../../packages/storage: {}
../../packages/taco: {}
../../packages/types: {}
../../packages/utils: {}
../../packages/video: {}
../capture: {}
../factory: {}
../mailbox: {}
../migrations: {}
../next: {}
../scout: {}
../strapi: {}
../uppy: {}
packages: packages:
'@babel/code-frame@7.24.7': '@babel/code-frame@7.24.7':

View File

@ -15,7 +15,7 @@ createCommand({
if (!message) return bot.logger.error('interaction.message was missing'); if (!message) return bot.logger.error('interaction.message was missing');
if (!message.id) return bot.logger.error(`interaction.message.id was missing`); if (!message.id) return bot.logger.error(`interaction.message.id was missing`);
const url = `${configs.postgrestUrl}/streams?discord_message_id=eq.${message.id}`; const url = `${configs.postgrestUrl}/vods?discord_message_id=eq.${message.id}`;
const options = { const options = {
method: 'PATCH', method: 'PATCH',
headers: { headers: {
@ -30,15 +30,15 @@ createCommand({
}) })
}; };
let streamId: string; let vodId: string;
try { try {
const response = await fetch(url, options); const response = await fetch(url, options);
bot.logger.info(`response.ok=${response.ok}`) bot.logger.info(`response.ok=${response.ok}`)
const data: any = await response.json(); const data: any = await response.json();
streamId = data?.at(0).id vodId = data?.at(0).id
bot.logger.info(interaction.user); bot.logger.info(interaction.user);
interaction.respond(`<@${interaction.user.id}> cancelled recording on Stream ${streamId}`, { isPrivate: false }) interaction.respond(`<@${interaction.user.id}> cancelled recording on VOD ${vodId}`, { isPrivate: false })
bot.logger.info(`Cancel command successfully ran on message.id=${message.id}`) bot.logger.info(`Cancel command successfully ran on message.id=${message.id}`)
} catch (error) { } catch (error) {

View File

@ -7,9 +7,7 @@ createCommand({
type: ApplicationCommandTypes.ChatInput, type: ApplicationCommandTypes.ChatInput,
async execute(interaction: Interaction) { async execute(interaction: Interaction) {
const ping = Date.now() - snowflakeToTimestamp(interaction.id) const ping = Date.now() - snowflakeToTimestamp(interaction.id)
const embeds = createEmbeds().setTitle(`The bot ping is ${ping}ms`) const embeds = createEmbeds().setTitle(`The bot ping is ${ping}ms`)
await interaction.respond({ embeds }) await interaction.respond({ embeds })
}, },
}) })

View File

@ -5,6 +5,7 @@ import getStreamFromDatabase from '../fetchers/getStreamFromDatabase.ts'
import patchStreamInDatabase from '../fetchers/patchStreamInDatabase.ts' import patchStreamInDatabase from '../fetchers/patchStreamInDatabase.ts'
import { quickAddJob, type WorkerUtilsOptions } from 'graphile-worker' import { quickAddJob, type WorkerUtilsOptions } from 'graphile-worker'
import { configs } from '../config.ts' import { configs } from '../config.ts'
import findVod from '../fetchers/findVod.ts'
function throwErr(msg: string) { function throwErr(msg: string) {
logger.error(msg) logger.error(msg)
@ -19,12 +20,12 @@ createCommand({
const discord_message_id = String(interaction?.message?.id) const discord_message_id = String(interaction?.message?.id)
logger.info(`process command begins.`) logger.info(`process command begins.`)
if (!discord_message_id) return throwErr('failed to get discord message id'); if (!discord_message_id) return throwErr('failed to get discord message id');
const stream = await getStreamFromDatabase(discord_message_id) const vod = await findVod({ discord_message_id })
if (!stream) return throwErr('failed to get stream'); if (!vod) return throwErr('failed to get vod while finding vod to process');
const options: WorkerUtilsOptions = { connectionString: configs.connectionString } const options: WorkerUtilsOptions = { connectionString: configs.connectionString }
logger.info(`now we will quickAddJob process_video`) logger.info(`now we will quickAddJob process_video`)
await quickAddJob(options, 'process_video', { stream_id: stream.id }) await quickAddJob(options, 'process_video', { vod_id: vod.id })
logger.info(`now we will patchStreamInDatabase`) logger.info(`now we will patchStreamInDatabase`)
await patchStreamInDatabase(stream.id, { status: 'processing' }) await patchStreamInDatabase(vod.id, { status: 'processing' })
}, },
}) })

View File

@ -16,10 +16,15 @@ import createVod from '../fetchers/createVod.ts'
import findOrCreateVtuber from '../fetchers/findOrCreateVtuber.ts' import findOrCreateVtuber from '../fetchers/findOrCreateVtuber.ts'
import findOrCreateStream from '../fetchers/findOrCreateStream.ts' import findOrCreateStream from '../fetchers/findOrCreateStream.ts'
/**
*
* Get the livestream URL from the pre-existing discord message. IDK why we do this
*/
async function getUrlFromMessage(interaction: Interaction): Promise<string|null> { async function getUrlFromMessage(interaction: Interaction): Promise<string|null> {
const messageId = interaction.message?.id const messageId = interaction.message?.id
const pgRequestUrl = `${configs.postgrestUrl}/streams?discord_message_id=eq.${messageId}` const pgRequestUrl = `${configs.postgrestUrl}/vods?discord_message_id=eq.${messageId}`
logger.info(`pgRequestUrl=${pgRequestUrl}`) logger.info(`pgRequestUrl=${pgRequestUrl}`)
const requestOptions = { const requestOptions = {
method: 'GET', method: 'GET',
@ -37,8 +42,8 @@ async function getUrlFromMessage(interaction: Interaction): Promise<string|null>
throw new Error(`Problem during getUrlFromMessage. res.status=${res.status}, res.statusText=${res.statusText}`) throw new Error(`Problem during getUrlFromMessage. res.status=${res.status}, res.statusText=${res.statusText}`)
} }
const json = await res.json() as Stream[] const json = await res.json() as Stream[]
const stream = json[0] const vod = json[0]
const url = stream?.url const url = vod?.url
if (!url) return null if (!url) return null
else return url else return url
} catch (e) { } catch (e) {
@ -110,11 +115,12 @@ createCommand({
throw new Error(msg) throw new Error(msg)
} }
const discord_message_id = message.id.toString()
const date = new Date() const date = new Date()
const vtuberId = await findOrCreateVtuber({ url }) const vtuberId = await findOrCreateVtuber({ url })
const streamId = await findOrCreateStream({ vtuberId, date }) const streamId = await findOrCreateStream({ vtuberId, date })
if (!streamId) throw new Error(`failed to find or create a Stream in database`); if (!streamId) throw new Error(`failed to find or create a Stream in database`);
const vodId = await createVod({ stream_id: streamId, vtuber: vtuberId, url }) const vodId = await createVod({ stream_id: streamId, vtuber: vtuberId, url, discord_message_id })
logger.info(`Success! We have created VOD id=${vodId}`) logger.info(`Success! We have created VOD id=${vodId}`)
} catch (e) { } catch (e) {

View File

@ -9,6 +9,7 @@ const requiredEnvVars = [
'DISCORD_GUILD_ID', 'DISCORD_GUILD_ID',
'DISCORD_APPLICATION_ID', 'DISCORD_APPLICATION_ID',
'AUTOMATION_USER_JWT', 'AUTOMATION_USER_JWT',
'SCOUT_URL'
] as const; ] as const;
const getEnvVar = (key: typeof requiredEnvVars[number]): string => { const getEnvVar = (key: typeof requiredEnvVars[number]): string => {
@ -28,10 +29,12 @@ export interface Config {
discordChannelId: string; discordChannelId: string;
connectionString: string; connectionString: string;
discordApplicationId: string; discordApplicationId: string;
scoutUrl: string;
} }
export const configs: Config = { export const configs: Config = {
scoutUrl: getEnvVar('SCOUT_URL'),
connectionString: getEnvVar('WORKER_CONNECTION_STRING'), connectionString: getEnvVar('WORKER_CONNECTION_STRING'),
httpProxy: getEnvVar('HTTP_PROXY'), httpProxy: getEnvVar('HTTP_PROXY'),
postgrestUrl: getEnvVar('POSTGREST_URL'), postgrestUrl: getEnvVar('POSTGREST_URL'),

View File

@ -22,7 +22,7 @@ const handleApplicationCommand = async function handleApplicationCommand (intera
const command = commands.get(interaction.data.name) const command = commands.get(interaction.data.name)
if (!command) { if (!command) {
bot.logger.error(`Command ${interaction.data.name} (customId=${interaction.data.customId}) not found`) bot.logger.error(`Command ${interaction.data.name} not found`)
return return
} }
@ -34,7 +34,7 @@ const handleMessageComponent = async function handleMessageComponent (interactio
if (!interaction.data) return if (!interaction.data) return
if (!interaction.data.customId) return if (!interaction.data.customId) return
const command = commands.get(interaction.data.customId) const command = commands.get(interaction.data.customId)
if (!command) return bot.logger.error(`Command ${interaction.data.customId} not found`); if (!command) return bot.logger.error(`Command customId=${interaction.data.customId} not found`);
execCommand(command, interaction) execCommand(command, interaction)
} }

View File

@ -1,6 +1,5 @@
import { configs } from "../config.ts" import { configs } from "../config.ts"
import type { VtuberRecord, VtuberResponse } from "@futureporn/types" import type { VtuberRecord, VtuberResponse } from "@futureporn/types"
import scrapeVtuberData from '@futureporn/scout/scrapeVtuberData.ts'
import { bot } from '../bot.ts' import { bot } from '../bot.ts'
import qs from 'qs' import qs from 'qs'
@ -59,6 +58,7 @@ async function findVtuber(query: Partial<vTuberSearchQuery>, options?: Partial<v
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VtuberResponse[] const json = await res.json() as VtuberResponse[]
bot.logger.info(`vtuber results as follows.`)
bot.logger.info(json) bot.logger.info(json)
return json?.at(0)?.id || null return json?.at(0)?.id || null
} }
@ -84,6 +84,8 @@ async function createVtuber(vtuber: Partial<VtuberRecord>): Promise<string> {
throw new Error(msg) throw new Error(msg)
} }
const json = await res.json() as VtuberResponse[] const json = await res.json() as VtuberResponse[]
bot.logger.info(`createVtuber with vtuber as follows`)
bot.logger.info(vtuber)
bot.logger.info(json) bot.logger.info(json)
const vtuberData = json[0] const vtuberData = json[0]
if (!vtuberData) throw new Error('failed to createVtuber') if (!vtuberData) throw new Error('failed to createVtuber')
@ -99,7 +101,8 @@ export default async function findOrCreateVtuber(query: Partial<vTuberSearchQuer
const foundVtuber = await findVtuber(query) const foundVtuber = await findVtuber(query)
if (!foundVtuber) { if (!foundVtuber) {
const vtuber = await scrapeVtuberData(url) bot.logger.info(`Failed to find vtuber, so we create one.`)
const vtuber = await fetch(`${configs.scoutUrl}/vtuber/data?url=${url}`).then((res): Promise<VtuberRecord> => res.json() as any)
return createVtuber(vtuber) return createVtuber(vtuber)
} else { } else {
return foundVtuber return foundVtuber

View File

@ -0,0 +1,28 @@
import type { VodResponse } from "@futureporn/types";
import { bot } from "../bot.ts";
import { configs } from "../config.ts";
export default async function findVod({ vod_id, discord_message_id }: { vod_id?: string, discord_message_id?: string }): Promise<VodResponse|null> {
const fetchUrl = (!!vod_id)
? `${configs.postgrestUrl}/vods?id=eq.${vod_id}&select=*,segments(bytes,updated_at,created_at)`
: `${configs.postgrestUrl}/vods?discord_message_id=eq.${discord_message_id}&select=*,segments(bytes,updated_at,created_at)`
bot.logger.info(fetchUrl)
const fetchOptions = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
'Accept': 'application/json',
'Prefer': 'return=representation'
}
}
const res = await fetch(fetchUrl, fetchOptions)
if (!res.ok) {
const msg = `request failed. status=${res.status}, statusText=${res.statusText}`
bot.logger.error(msg)
throw new Error(msg)
}
const json = await res.json() as VodResponse[]
// bot.logger.info(`vod results as follows.`)
// bot.logger.info(json)
return json?.at(0) || null
}

View File

@ -30,7 +30,6 @@ async function setupGraphileWorker() {
const runnerOptions: RunnerOptions = { const runnerOptions: RunnerOptions = {
preset preset
} }
const runner = await run(runnerOptions) const runner = await run(runnerOptions)
if (!runner) throw new Error('failed to initialize graphile worker'); if (!runner) throw new Error('failed to initialize graphile worker');
await runner.promise await runner.promise
@ -39,22 +38,21 @@ async function setupGraphileWorker() {
async function setupBot() { async function setupBot() {
bot.logger.info('Starting @futureporn/bot.') bot.logger.info('Starting @futureporn/bot.')
bot.logger.info('Loading commands...') bot.logger.info('Loading commands...')
await importDirectory(join(__dirname, './commands')) await importDirectory(join(__dirname, './commands'))
bot.logger.info('Loading events...') bot.logger.info('Loading events...')
await importDirectory(join(__dirname, './events')) await importDirectory(join(__dirname, './events'))
await bot.start() await bot.start()
} }
async function main() { async function main() {
console.log('setting up bot')
await setupBot() await setupBot()
await setupGraphileWorker() console.log('setting up graphile worker')
await setupGraphileWorker() // this needs to run after setupBot() has run
console.log('updating discord application commands')
await updateApplicationCommands() // this needs to run after importDirectory() has run await updateApplicationCommands() // this needs to run after importDirectory() has run
} }

View File

@ -1,5 +1,5 @@
import 'dotenv/config' import 'dotenv/config'
import type { Status, Stream, Segment } from '@futureporn/types' import type { Status, Stream, SegmentResponse, VodRecord, VodResponse } from '@futureporn/types'
import { type Task, type Helpers } from 'graphile-worker' import { type Task, type Helpers } from 'graphile-worker'
import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns' import { intervalToDuration, formatDuration, isBefore, sub, max } from 'date-fns'
import prettyBytes from 'pretty-bytes' import prettyBytes from 'pretty-bytes'
@ -13,12 +13,13 @@ import {
} from '@discordeno/bot' } from '@discordeno/bot'
import { bot } from '../bot.ts' import { bot } from '../bot.ts'
import { configs } from '../config.ts' import { configs } from '../config.ts'
import findVod from '../fetchers/findVod.ts'
const yeahEmojiId = BigInt('1253191939461873756') const yeahEmojiId = BigInt('1253191939461873756')
interface Payload { interface Payload {
vod_id: number; vod_id: string;
} }
@ -26,39 +27,29 @@ interface Payload {
function assertPayload(payload: any): asserts payload is Payload { function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (!payload.vod_id) throw new Error(`vod_id was absent in the payload`); if (!payload.vod_id) throw new Error(`vod_id was absent in the payload`);
if (typeof payload.vod_id !== 'string') throw new Error(`vod_id was not a string`);
} }
async function editDiscordMessage({ helpers, stream }: { stream: Stream, helpers: Helpers }) { async function editDiscordMessage({ helpers, vod }: { vod: VodResponse, helpers: Helpers }) {
// bot.logger.info(`editDiscordmessage with vod status=${vod.status} `)
const discordMessageId = stream.discord_message_id // bot.logger.info(vod)
const discordMessageId = vod.discord_message_id
if (!discordMessageId) throw new Error(`discordMessageId was missing!`); if (!discordMessageId) throw new Error(`discordMessageId was missing!`);
if (typeof discordMessageId !== 'string') throw new Error(`discordMessageId was not a string!`); if (typeof discordMessageId !== 'string') throw new Error(`discordMessageId was not a string!`);
const channelId = BigInt(configs.discordChannelId) const channelId = BigInt(configs.discordChannelId)
const updatedMessage: EditMessage = { const updatedMessage: EditMessage = {
embeds: getEmbeds(stream), embeds: getEmbeds(vod, helpers),
components: getButtonRow(stream.status) components: getButtonRow(vod.status)
} }
bot.helpers.editMessage(channelId, discordMessageId, updatedMessage) bot.helpers.editMessage(channelId, discordMessageId, updatedMessage)
} }
async function getStreamFromDatabase(streamId: number) {
const res = await fetch(`${configs.postgrestUrl}/streams?select=*,segments(*)&id=eq.${streamId}`)
if (!res.ok) {
throw new Error(`failed fetching stream ${streamId}. status=${res.status}, statusText=${res.statusText}`)
}
const body = await res.json() as Stream[]
return body[0];
}
/** /**
@ -72,12 +63,13 @@ export const update_discord_message: Task = async function (payload, helpers: He
try { try {
assertPayload(payload) assertPayload(payload)
const { vod_id } = payload const { vod_id } = payload
const streamId = vod_id const vodId = vod_id
const stream = await getStreamFromDatabase(streamId) const vod = await findVod({ vod_id: vodId })
if (!stream) throw new Error('failed to get stream from database'); if (!vod) throw new Error('failed to get vod from database');
// helpers.logger.info(`update_discord_message with streamId=${streamId}. stream=${JSON.stringify(stream)}`) // helpers.logger.info(`update_discord_message got the following vod`)
editDiscordMessage({ helpers, stream }) // helpers.logger.info(JSON.stringify(vod, null, 2))
await editDiscordMessage({ helpers, vod })
} catch (e) { } catch (e) {
helpers.logger.error(`caught an error during update_discord_message. e=${e}`) helpers.logger.error(`caught an error during update_discord_message. e=${e}`)
} }
@ -85,13 +77,14 @@ export const update_discord_message: Task = async function (payload, helpers: He
function getEmbeds(stream: Stream) { function getEmbeds(vod: VodResponse, helpers: Helpers) {
const streamId = stream.id const vodId = vod.id
const url = stream.url const url = vod.url
const segments = stream?.segments const segments = vod?.segments
const status = stream.status const status = vod.status || 'unknown'
bot.logger.info(`getEmbeds with vodId=${vodId}, status=${vod.status}, segments.length=${segments?.length}`)
const embeds = new EmbedsBuilder() const embeds = new EmbedsBuilder()
.setTitle(`Stream ${streamId}`) .setTitle(`VOD ${vodId}`)
.setFields([ .setFields([
{ name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true }, { name: 'Status', value: status.charAt(0).toUpperCase()+status.slice(1), inline: true },
// { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo // { name: 'Filesize', value: prettyBytes(fileSize), inline: true }, // filesize isn't on stream. filesize is on segment. keeping for reference. @todo
@ -103,7 +96,7 @@ function getEmbeds(stream: Stream) {
.setColor(2326507) .setColor(2326507)
} else if (status === 'recording') { } else if (status === 'recording') {
embeds embeds
.setDescription('The stream is being recorded.') .setDescription('The vod is being recorded.')
.setColor(392960) .setColor(392960)
} else if (status === 'aborted') { } else if (status === 'aborted') {
embeds embeds
@ -123,7 +116,7 @@ function getEmbeds(stream: Stream) {
.setColor(392960) .setColor(392960)
} else if (status === 'stalled') { } else if (status === 'stalled') {
embeds embeds
.setDescription("We have not received a progress update in the past two minutes.") .setDescription("We have not received a progress update recently.")
.setColor(8289651) .setColor(8289651)
} else { } else {
embeds embeds
@ -133,10 +126,10 @@ function getEmbeds(stream: Stream) {
// Add an Embed for segments // Add an Embed for segments
if (segments) { if (segments) {
const getDuration = (s: Segment) => formatDuration(intervalToDuration({ start: s.created_at, end: s.updated_at })) const getDuration = (s: SegmentResponse) => formatDuration(intervalToDuration({ start: s.created_at, end: s.updated_at }))
embeds.newEmbed() embeds.newEmbed()
.setTitle(`Recording Segments`) .setTitle(`Recording Segments`)
.setFields(segments.map((s, i) => ( .setFields(segments.sort((a, b) => new Date(a.created_at).getTime() - new Date(b.created_at).getTime()).map((s, i) => (
{ {
name: `Segment ${i+1}`, name: `Segment ${i+1}`,
value: `${getDuration(s)} (${prettyBytes(s.bytes)})`, value: `${getDuration(s)} (${prettyBytes(s.bytes)})`,
@ -211,6 +204,7 @@ function getButtonRow(streamStatus: Status): ActionRow[] {
components.push(processButton) components.push(processButton)
} else { } else {
components.push(retryButton) components.push(retryButton)
components.push(processButton)
} }

View File

@ -7,108 +7,109 @@ import { configs } from '../config.ts'
interface Payload { interface Payload {
stalled_minutes: number; stalled_minutes: number;
finished_minutes: number;
} }
function assertPayload(payload: any): asserts payload is Payload { function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload !== "object" || !payload) throw new Error("invalid payload");
if (!payload.stalled_minutes) throw new Error(`stalled_minutes was absent in the payload`); if (!payload.stalled_minutes) throw new Error(`stalled_minutes was absent in the payload`);
if (!payload.finished_minutes) throw new Error(`finished_minutes was absent in the payload`);
if (typeof payload.stalled_minutes !== 'number') throw new Error(`stalled_minutes parameter was not a number`); if (typeof payload.stalled_minutes !== 'number') throw new Error(`stalled_minutes parameter was not a number`);
if (typeof payload.finished_minutes !== 'number') throw new Error(`finished_minutes parameter was not a number`);
}
async function updateFinishedVods({
helpers,
finished_minutes,
url
}: {
helpers: Helpers,
finished_minutes: number,
url: string
}) {
helpers.logger.info(`updateFinishedVods with finished_minutes=${finished_minutes}, url=${url}`)
// 1. identify and update stalled /vods
// Any vods that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled.
const timestamp = sub(new Date(), { minutes: finished_minutes }).toISOString()
const queryOptions = {
updated_at: `lt.${timestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)'
}
const updatePayload = {
updated_at: new Date().toISOString(),
status: 'stalled' as Status
}
// helpers.logger.info(JSON.stringify(updatePayload))
const query = qs.stringify(queryOptions)
const res = await fetch (`${url}?${query}`, {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(updatePayload)
})
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
} }
async function updateStalledVods({ async function updateStalledVods({
helpers, helpers,
stalled_minutes, url,
url stalled_minutes = 1,
}: { }: {
helpers: Helpers, helpers: Helpers,
stalled_minutes: number, url: string,
url: string stalled_minutes?: number,
}) { }) {
// Identify and update stalled vods
const stalledTimestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString();
const stalledQueryOptions = {
select: 'status,id,segments!inner(updated_at)',
'segments.updated_at': `lt.${stalledTimestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)',
};
const stalledUpdatePayload = {
status: 'stalled',
};
// 1. identify and update stalled /vods const stalledQuery = qs.stringify(stalledQueryOptions, { encode: false });
// Any vods that was updated earlier than n minute ago AND is in 'pending_recording' or 'recording' state is marked as stalled. const stalledFetchUrl = `${url}?${stalledQuery}`;
const timestamp = sub(new Date(), { minutes: stalled_minutes }).toISOString() helpers.logger.info(`updateStalledVods with stalledFetchUrl=${stalledFetchUrl}, stalled_minutes=${stalled_minutes}, url=${url}`);
const queryOptions = { const stalledOptions = {
updated_at: `lt.${timestamp}`, method: 'PATCH',
or: '(status.eq.pending_recording,status.eq.recording)' headers: {
} 'Content-Type': 'application/json',
const updatePayload = { 'Authorization': `Bearer ${configs.automationUserJwt}`,
updated_at: new Date().toISOString(), 'Prefer': 'return=headers-only',
status: 'stalled' as Status },
} body: JSON.stringify(stalledUpdatePayload),
// helpers.logger.info(JSON.stringify(updatePayload)) };
const query = qs.stringify(queryOptions) const stalledRes = await fetch(stalledFetchUrl, stalledOptions);
const res = await fetch (`${url}?${query}`, { if (!stalledRes.ok) {
method: 'PATCH', const stalledBody = await stalledRes.text();
headers: { helpers.logger.info(JSON.stringify(stalledRes.headers));
'Content-Type': 'application/json', helpers.logger.error(`Stalled response code was not 200. status=${stalledRes.status}, statusText=${stalledRes.statusText}`);
'Authorization': `Bearer ${configs.automationUserJwt}`, helpers.logger.error(stalledBody);
'Prefer': 'return=headers-only' return;
}, }
body: JSON.stringify(updatePayload)
})
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
}
async function updateRecordingVods({
helpers,
url
}: {
helpers: Helpers,
url: string
}) {
// identify and update recording /vods
// Any vods that has a segment that was updated within the past 1 minutes is considered recording
const timestamp = sub(new Date(), { minutes: 1 }).toISOString()
const queryOptions = {
select: 'status,id,segments!inner(updated_at)',
'segments.updated_at': `lt.${timestamp}`,
or: '(status.eq.pending_recording,status.eq.recording)',
}
const updatePayload = {
status: 'recording'
}
// helpers.logger.info(JSON.stringify(updatePayload))
const query = qs.stringify(queryOptions)
const options = {
method: 'PATCH',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${configs.automationUserJwt}`,
'Prefer': 'return=headers-only'
},
body: JSON.stringify(updatePayload)
}
const res = await fetch (`${url}?${query}`, options)
if (!res.ok) {
const body = await res.text()
helpers.logger.info(JSON.stringify(res.headers))
helpers.logger.error(`Response code was not 200. status=${res.status}, statusText=${res.statusText}`)
helpers.logger.error(body)
return;
}
} }
export const update_vod_statuses: Task = async function (payload: unknown, helpers: Helpers) { export const update_vod_statuses: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload) assertPayload(payload)
const { stalled_minutes } = payload const { stalled_minutes, finished_minutes } = payload
// helpers.logger.info(`update_vod_statuses has begun.`) helpers.logger.info(` & & update_vod_statuses has begun.`)
const url = 'http://postgrest.futureporn.svc.cluster.local:9000/vods' const url = 'http://postgrest.futureporn.svc.cluster.local:9000/vods'
try { try {
// await updateStalledVods({ helpers, url, stalled_minutes }) await updateStalledVods({ helpers, url, stalled_minutes })
await updateRecordingVods({ helpers, url }) await updateFinishedVods({ helpers, url, finished_minutes })
} catch (e: any) { } catch (e: any) {
if (e instanceof Error) { if (e instanceof Error) {
helpers.logger.error(`hi there we encountered an error while fetching /vods`) helpers.logger.error(`hi there we encountered an error while fetching /vods`)
@ -116,7 +117,6 @@ export const update_vod_statuses: Task = async function (payload: unknown, helpe
} else { } else {
helpers.logger.error(e) helpers.logger.error(e)
} }
} }
} }

View File

@ -21,7 +21,6 @@
"@aws-sdk/client-s3": "^3.617.0", "@aws-sdk/client-s3": "^3.617.0",
"@aws-sdk/lib-storage": "^3.588.0", "@aws-sdk/lib-storage": "^3.588.0",
"@aws-sdk/types": "^3.609.0", "@aws-sdk/types": "^3.609.0",
"@futureporn/scout": "workspace:^",
"@futureporn/types": "workspace:^", "@futureporn/types": "workspace:^",
"@futureporn/utils": "workspace:^", "@futureporn/utils": "workspace:^",
"@paralleldrive/cuid2": "^2.2.2", "@paralleldrive/cuid2": "^2.2.2",

View File

@ -17,9 +17,6 @@ importers:
'@aws-sdk/types': '@aws-sdk/types':
specifier: ^3.609.0 specifier: ^3.609.0
version: 3.609.0 version: 3.609.0
'@futureporn/scout':
specifier: workspace:^
version: link:../../packages/scout
'@futureporn/types': '@futureporn/types':
specifier: workspace:^ specifier: workspace:^
version: link:../../packages/types version: link:../../packages/types
@ -196,6 +193,42 @@ importers:
specifier: ^5.5.3 specifier: ^5.5.3
version: 5.5.4 version: 5.5.4
../..: {}
../../packages/image: {}
../../packages/infra: {}
../../packages/meal: {}
../../packages/old: {}
../../packages/storage: {}
../../packages/taco: {}
../../packages/types: {}
../../packages/utils: {}
../../packages/video: {}
../bot: {}
../factory: {}
../mailbox: {}
../migrations: {}
../next: {}
../scout: {}
../strapi: {}
../uppy: {}
packages: packages:
'@aws-crypto/crc32@5.2.0': '@aws-crypto/crc32@5.2.0':

View File

@ -1,10 +1,11 @@
import cheerio from 'cheerio' import { load } from 'cheerio'
import fetch from 'node-fetch' import fetch from 'node-fetch'
import scrapingFetch from '@futureporn/scout/scrapingFetch.ts'
export async function getRandomRoom () { export async function getRandomRoom () {
const res = await fetch('https://chaturbate.com/') const res = await scrapingFetch('https://chaturbate.com/')
const body = await res.text() const body = await res.text()
const $ = cheerio.load(body) const $ = load(body)
let roomsRaw = $('a[data-room]') let roomsRaw = $('a[data-room]')
let rooms = [] let rooms = []
$(roomsRaw).each((_, e) => { $(roomsRaw).each((_, e) => {

View File

@ -1,6 +1,7 @@
import 'dotenv/config' import 'dotenv/config'
const requiredEnvVars = [ const requiredEnvVars = [
'SCOUT_URL',
'S3_ACCESS_KEY_ID', 'S3_ACCESS_KEY_ID',
'S3_SECRET_ACCESS_KEY', 'S3_SECRET_ACCESS_KEY',
'S3_REGION', 'S3_REGION',
@ -19,6 +20,7 @@ const getEnvVar = (key: typeof requiredEnvVars[number]): string => {
}; };
export interface Config { export interface Config {
scoutUrl: string;
postgrestUrl: string; postgrestUrl: string;
automationUserJwt: string; automationUserJwt: string;
s3AccessKeyId: string; s3AccessKeyId: string;
@ -30,6 +32,7 @@ export interface Config {
export const configs: Config = { export const configs: Config = {
scoutUrl: getEnvVar('SCOUT_URL'),
postgrestUrl: getEnvVar('POSTGREST_URL'), postgrestUrl: getEnvVar('POSTGREST_URL'),
automationUserJwt: getEnvVar('AUTOMATION_USER_JWT'), automationUserJwt: getEnvVar('AUTOMATION_USER_JWT'),
s3AccessKeyId: getEnvVar('S3_ACCESS_KEY_ID'), s3AccessKeyId: getEnvVar('S3_ACCESS_KEY_ID'),

View File

@ -2,7 +2,7 @@ import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts' import { configs } from '../config.ts'
import querystring from 'node:querystring' import querystring from 'node:querystring'
export default async function createSegmentInDatabase(s3_key: string, vod_id: string, helpers: Helpers): Promise<number> { export default async function createSegmentInDatabase(s3_key: string, vod_id: string, helpers: Helpers): Promise<string> {
if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg'); if (!s3_key) throw new Error('getSegments requires {string} s3_key as first arg');
const segmentPayload = { const segmentPayload = {
s3_key, s3_key,
@ -34,5 +34,5 @@ export default async function createSegmentInDatabase(s3_key: string, vod_id: st
if (Array.isArray(segmentsId)) throw new Error('segmentsId was an array which is unexpected'); if (Array.isArray(segmentsId)) throw new Error('segmentsId was an array which is unexpected');
const id = segmentsId.split('.').at(-1) const id = segmentsId.split('.').at(-1)
if (!id) throw new Error('failed to get id '); if (!id) throw new Error('failed to get id ');
return parseInt(id) return id
} }

View File

@ -2,9 +2,10 @@ import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts' import { configs } from '../config.ts'
import querystring from 'node:querystring' import querystring from 'node:querystring'
export default async function createSegmentsVodLink(vod_id: string, segment_id: number, helpers: Helpers): Promise<number> { export default async function createSegmentsVodLink(vod_id: string, segment_id: string, helpers: Helpers): Promise<number> {
helpers.logger.info(`createSegmentsVodLink with vod_id=${vod_id}, segment_id=${segment_id}`)
if (!vod_id) throw new Error('createSegmentsVodLink requires {string} vod_id as first arg'); if (!vod_id) throw new Error('createSegmentsVodLink requires {string} vod_id as first arg');
if (!segment_id) throw new Error('createSegmentsVodLink requires {Number} segment_id as second arg'); if (!segment_id) throw new Error('createSegmentsVodLink requires {string} segment_id as second arg');
const segmentVodLinkPayload = { const segmentVodLinkPayload = {
vod_id, vod_id,
segment_id segment_id

View File

@ -0,0 +1,21 @@
import { configs } from '../config.ts'
export default async function getPlaylistUrl (url: string): Promise<string|null> {
if (!url) throw new Error(`getPlaylistUrl requires a url, but it was undefined.`);
const res = await fetch(`${configs.scoutUrl}/ytdlp/playlist-url?url=${url}`)
if (!res.ok) {
const body = await res.text()
console.error(`failed to getPlaylistUrl res.status=${res.status}, res.statusText=${res.statusText}, body=${body}`)
return null
} else {
const data = await res.json() as any
console.log(`>>>>>> getPlaylistUrl got a data payload as follows`)
console.log(data)
if (!!data.error) {
return null;
} else {
return data.data.url
}
}
}

View File

@ -1,22 +1,32 @@
import type { Segment } from '@futureporn/types' import type { SegmentResponse } from '@futureporn/types'
import type { Helpers } from 'graphile-worker' import type { Helpers } from 'graphile-worker'
import { configs } from '../config.ts' import { configs } from '../config.ts'
/**
* updateSegmentInDatabase
*
* updates the segment in the database with the new filesize
*
* resolves with the updated segment and the is_recording_aborted column of the related vod
*/
export default async function updateSegmentInDatabase({ export default async function updateSegmentInDatabase({
segment_id, segment_id,
fileSize, fileSize,
helpers helpers
}: { }: {
segment_id: number, segment_id: string,
fileSize: number, fileSize: number,
helpers: Helpers helpers: Helpers
}): Promise<Segment> { }): Promise<SegmentResponse> {
const payload: any = { const payload: any = {
bytes: fileSize bytes: fileSize
} }
const res = await fetch(`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=stream:streams(is_recording_aborted)`, { const fetchUrl =`${configs.postgrestUrl}/segments?id=eq.${segment_id}&select=vod:vods(is_recording_aborted)`
// helpers.logger.info(`updateSegmentInDatabase > fetchUrl=${fetchUrl}`)
const res = await fetch(fetchUrl, {
method: 'PATCH', method: 'PATCH',
headers: { headers: {
'Content-Type': 'application/json', 'Content-Type': 'application/json',
@ -28,12 +38,12 @@ export default async function updateSegmentInDatabase({
}) })
if (!res.ok) { if (!res.ok) {
const body = await res.text() const body = await res.text()
const msg = `failed to updateDatabaseRecord. status=${res.status}, statusText=${res.statusText}, body=${body}` const msg = `failed to updateSegmentInDatabase. status=${res.status}, statusText=${res.statusText}, body=${body}`
helpers.logger.error(msg) helpers.logger.error(msg)
throw new Error(msg); throw new Error(msg);
} }
// helpers.logger.info(`response was OK~`) // helpers.logger.info(`response was OK~`)
const body = await res.json() as Segment[]; const body = await res.json() as SegmentResponse[];
if (!body[0]) throw new Error(`failed to get a segment that matched segment_id=${segment_id}`); if (!body[0]) throw new Error(`failed to get a segment that matched segment_id=${segment_id}`);
const bod = body[0] const bod = body[0]
// helpers.logger.info('the following was the response from PATCH-ing /segments') // helpers.logger.info('the following was the response from PATCH-ing /segments')

View File

@ -3,12 +3,13 @@
import updateSegmentInDatabase from '../fetchers/updateSegmentInDatabase.ts' import updateSegmentInDatabase from '../fetchers/updateSegmentInDatabase.ts'
import { Helpers, type Task } from 'graphile-worker' import { Helpers, type Task } from 'graphile-worker'
import Record from '../Record.ts' import Record from '../Record.ts'
import { getPlaylistUrl } from '@futureporn/scout/ytdlp.ts' import type { SegmentResponse, ScoutResponse } from '@futureporn/types'
import type { Segment } from '@futureporn/types'
import { configs } from '../config.ts' import { configs } from '../config.ts'
import { createId } from '@paralleldrive/cuid2' import { createId } from '@paralleldrive/cuid2'
import createSegmentInDatabase from '../fetchers/createSegmentInDatabase.ts' import createSegmentInDatabase from '../fetchers/createSegmentInDatabase.ts'
import createSegmentsVodLink from '../fetchers/createSegmentsVodLink.ts' import createSegmentsVodLink from '../fetchers/createSegmentsVodLink.ts'
import getPlaylistUrl from '../fetchers/getPlaylistUrl.ts'
import { String } from 'aws-sdk/clients/acm'
/** /**
* url is the URL to be recorded. Ex: chaturbate.com/projektmelody * url is the URL to be recorded. Ex: chaturbate.com/projektmelody
@ -29,7 +30,8 @@ function assertPayload(payload: any): asserts payload is Payload {
} }
async function getRecordInstance(url: string, segment_id: number, helpers: Helpers) { async function getRecordInstance(url: string, segment_id: string, helpers: Helpers) {
helpers.logger.info(`getRecordInstance() with url=${url}, segment_id=${segment_id}`)
const abortController = new AbortController() const abortController = new AbortController()
const abortSignal = abortController.signal const abortSignal = abortController.signal
const accessKeyId = configs.s3AccessKeyId; const accessKeyId = configs.s3AccessKeyId;
@ -38,6 +40,8 @@ async function getRecordInstance(url: string, segment_id: number, helpers: Helpe
const endpoint = configs.s3Endpoint; const endpoint = configs.s3Endpoint;
const bucket = configs.s3Bucket; const bucket = configs.s3Bucket;
const playlistUrl = await getPlaylistUrl(url) const playlistUrl = await getPlaylistUrl(url)
if (!playlistUrl) throw new Error('failed to getPlaylistUrl');
helpers.logger.info(`playlistUrl=${playlistUrl}`)
const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint }) const s3Client = Record.makeS3Client({ accessKeyId, secretAccessKey, region, endpoint })
const inputStream = Record.getFFmpegStream({ url: playlistUrl }) const inputStream = Record.getFFmpegStream({ url: playlistUrl })
const onProgress = (fileSize: number) => { const onProgress = (fileSize: number) => {
@ -55,8 +59,10 @@ async function getRecordInstance(url: string, segment_id: number, helpers: Helpe
return record return record
} }
function checkIfAborted(segment: Partial<Segment>): boolean { function checkIfAborted(segment: Partial<SegmentResponse>): boolean {
return (!!segment?.stream?.at(0)?.is_recording_aborted) // console.log(`checkIfAborted with following segment`)
// console.log(segment)
return (!!segment?.vod?.is_recording_aborted)
} }
@ -69,11 +75,11 @@ function checkIfAborted(segment: Partial<Segment>): boolean {
* *
* Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can. * Ideally, we record the entire livestream, but the universe is not so kind. Network interruptions are common, so we handle the situation as best as we can.
* *
* This function creates a new segments and segments_streams_links entry in the db via Postgrest REST API. * This function creates a new segments and vods_segments_links entry in the db via Postgrest REST API.
* *
* This function also names the S3 file (s3_key) with a datestamp and a cuid. * This function also names the S3 file (s3_key) with a datestamp and a cuid.
*/ */
const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers): Promise<void> { const doRecordSegment = async function doRecordSegment(url: string, vod_id: string, helpers: Helpers) {
const s3_key = `${new Date().toISOString()}-${createId()}.ts` const s3_key = `${new Date().toISOString()}-${createId()}.ts`
helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`) helpers.logger.info(`let's create a segment using vod_id=${vod_id}, url=${url}`)
const segment_id = await createSegmentInDatabase(s3_key, vod_id, helpers) const segment_id = await createSegmentInDatabase(s3_key, vod_id, helpers)
@ -81,7 +87,7 @@ const doRecordSegment = async function doRecordSegment(url: string, vod_id: stri
const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id, helpers) const segmentsVodLinkId = await createSegmentsVodLink(vod_id, segment_id, helpers)
helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`) helpers.logger.info(`doTheRecording with createSegmentsVodLink segmentsVodLinkId=${segmentsVodLinkId}, vod_id=${vod_id}, segment_id=${segment_id}, url=${url}`)
const record = await getRecordInstance(url, segment_id, helpers) const record = await getRecordInstance(url, segment_id, helpers)
await record.start() return record.start()
} }

View File

@ -0,0 +1,24 @@
import { configs } from "../config"
import { Helpers } from "graphile-worker"
import { Stream } from "@futureporn/types"
export default async function getVod(vodId: string, helpers: Helpers) {
const url = `${configs.postgrestUrl}/streams?select=*,segments(*)&id=eq.${vodId}`
try {
const res = await fetch(url)
if (!res.ok) {
throw new Error(`failed fetching stream ${vodId}. status=${res.status}, statusText=${res.statusText}`)
}
const body = await res.json() as Stream[]
if (!body[0]) throw new Error('body[0] was expected to be Stream data, but it was either null or undefined.');
return body[0];
} catch (e) {
helpers.logger.error(`encountered an error during getStreamFromDatabase()`)
if (e instanceof Error) {
helpers.logger.error(e.message)
} else {
helpers.logger.error(JSON.stringify(e))
}
return null
}
}

View File

@ -22,7 +22,6 @@ interface s3File {
interface Payload { interface Payload {
s3_manifest: s3File[]; s3_manifest: s3File[];
stream_id?: string;
vod_id?: string; vod_id?: string;
} }
@ -177,10 +176,9 @@ export const combine_video_segments: Task = async function (payload: unknown, he
// helpers.logger.info(payload) // helpers.logger.info(payload)
// helpers.logger.info(JSON.stringify(payload?.s3_manifest)) // helpers.logger.info(JSON.stringify(payload?.s3_manifest))
assertPayload(payload) assertPayload(payload)
const { s3_manifest, vod_id, stream_id } = payload const { s3_manifest, vod_id } = payload
if (!stream_id) helpers.logger.warn(`combine_video_segments was called without a stream_id. This is not recommended.`);
if (!vod_id) helpers.logger.warn(`combine_video_segments was called without a vod_id. This is not recommended.`); if (!vod_id) helpers.logger.warn(`combine_video_segments was called without a vod_id. This is not recommended.`);
helpers.logger.info(`combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}, stream_id=${stream_id}`) helpers.logger.info(`combine_video_segments started with s3_manifest=${JSON.stringify(s3_manifest)}, vod_id=${vod_id}`)
/** /**
* Here we take a manifest of S3 files and we download each of them. * Here we take a manifest of S3 files and we download each of them.
@ -217,7 +215,7 @@ export const combine_video_segments: Task = async function (payload: unknown, he
await upload.done() await upload.done()
if (vod_id && stream_id) { if (vod_id) {
// update the vod with the s3_file of the combined video // update the vod with the s3_file of the combined video
const s3File: S3File = { const s3File: S3File = {
s3_key: s3KeyName, s3_key: s3KeyName,

View File

@ -2,37 +2,18 @@ import type { Helpers, Task } from "graphile-worker"
import { configs } from "../config" import { configs } from "../config"
import type { Stream } from '@futureporn/types' import type { Stream } from '@futureporn/types'
import createVod from "../fetchers/createVod" import createVod from "../fetchers/createVod"
import getVod from "../fetchers/getVod"
interface Payload { interface Payload {
stream_id: string; vod_id: string;
} }
function assertPayload(payload: any): asserts payload is Payload { function assertPayload(payload: any): asserts payload is Payload {
if (typeof payload !== "object" || !payload) throw new Error("invalid payload (it must be an object)"); if (typeof payload !== "object" || !payload) throw new Error("invalid payload (it must be an object)");
if (typeof payload.stream_id !== "string") throw new Error("payload.stream_id was not a string"); if (typeof payload.vod_id !== "string") throw new Error("payload.vod_id was not a string");
} }
async function getStreamFromDatabase(streamId: string, helpers: Helpers) {
const url = `${configs.postgrestUrl}/streams?select=*,segments(*)&id=eq.${streamId}`
try {
const res = await fetch(url)
if (!res.ok) {
throw new Error(`failed fetching stream ${streamId}. status=${res.status}, statusText=${res.statusText}`)
}
const body = await res.json() as Stream[]
if (!body[0]) throw new Error('body[0] was expected to be Stream data, but it was either null or undefined.');
return body[0];
} catch (e) {
helpers.logger.error(`encountered an error during getStreamFromDatabase()`)
if (e instanceof Error) {
helpers.logger.error(e.message)
} else {
helpers.logger.error(JSON.stringify(e))
}
return null
}
}
/** /**
@ -40,11 +21,11 @@ async function getStreamFromDatabase(streamId: string, helpers: Helpers) {
* # process_video * # process_video
* *
* We just recorded a livestream. Now what? * We just recorded a livestream. Now what?
* process_video takes a /streams record and runs a bunch of processes to get it ready for publishing. * process_video takes a /vods record and runs a bunch of processes to get it ready for publishing.
* *
* The following are graphile-worker tasks which process_video is responsible for adding to the job queue. * The following are graphile-worker tasks which process_video is responsible for adding to the job queue.
* Some of these tasks are run conditionally based on the structure of the /streams record. * Some of these tasks are run conditionally based on the structure of the /vods record.
* For example, combine_video_segments is only useful on a stream recording which ended up with multiple segments. * For example, combine_video_segments is only useful on a vod recording which ended up with multiple segments.
* *
* - combine_video_segments * - combine_video_segments
* - generate_thumbnail * - generate_thumbnail
@ -67,25 +48,19 @@ async function getStreamFromDatabase(streamId: string, helpers: Helpers) {
*/ */
const process_video: Task = async function (payload: unknown, helpers: Helpers) { const process_video: Task = async function (payload: unknown, helpers: Helpers) {
assertPayload(payload) assertPayload(payload)
const { stream_id } = payload const { vod_id } = payload
helpers.logger.info(`process_video task has begun for stream_id=${stream_id}`) helpers.logger.info(`process_video task has begun for vod_id=${vod_id}`)
const stream = await getStreamFromDatabase(stream_id, helpers) const vod = await getVod(vod_id, helpers)
if (!stream) throw new Error(`failed to get stream from database.`); if (!vod) throw new Error(`failed to get vod from database.`);
if (!stream.segments) throw new Error(`stream ${stream_id} fetched from database lacked any segments.`); if (!vod.segments) throw new Error(`vod ${vod_id} fetched from database lacked any segments.`);
const isVodPresent: boolean = !!(stream?.vods && stream.vods.length > 0) const isCombinationNeeded = (vod.segments.length > 1)
if (isCombinationNeeded) {
if (!isVodPresent) { const s3_manifest = vod.segments.map((segment) => ({ key: segment.s3_key }))
const vod = await createVod(stream) helpers.addJob('combine_video_segments', { s3_manifest, vod_id })
if (!vod) throw new Error('failed to create vod')
const vod_id = vod.id
const isCombinationNeeded = (stream.segments.length > 1)
if (isCombinationNeeded) {
const s3_manifest = stream.segments.map((segment) => ({ key: segment.s3_key }))
helpers.addJob('combine_video_segments', { s3_manifest, vod_id, stream_id })
}
} }
} }
export default process_video; export default process_video;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN is_recording_aborted BOOLEAN;

View File

@ -0,0 +1,6 @@
ALTER TABLE api.segments
DROP COLUMN vod_id;
ALTER TABLE api.segments
ADD COLUMN vod_id UUID REFERENCES api.vods(id);

View File

@ -0,0 +1,23 @@
DROP FUNCTION update_stream_on_segment_update CASCADE;
CREATE OR REPLACE FUNCTION update_vod_on_segment_update()
RETURNS TRIGGER AS $$
BEGIN
UPDATE api.vods
SET updated_at = NOW()
WHERE id IN (
SELECT vod_id
FROM segments_vod_links
WHERE segment_id = NEW.id
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trigger_update_vod
AFTER UPDATE ON api.segments
FOR EACH ROW
EXECUTE FUNCTION update_vod_on_segment_update();

View File

@ -0,0 +1,6 @@
ALTER TABLE api.segments
DROP COLUMN id;
ALTER TABLE api.segments
ADD COLUMN id uuid PRIMARY KEY DEFAULT gen_random_uuid();

View File

@ -0,0 +1,10 @@
-- we need vod_id and segment_id to be uuid, not text
DROP TABLE api.segments_vod_links;
CREATE TABLE api.segments_vod_links (
id UUID DEFAULT gen_random_uuid(),
vod_id UUID NOT NULL REFERENCES api.vods(id),
segment_id UUID NOT NULL REFERENCES api.segments(id),
PRIMARY KEY(id, vod_id, segment_id)
);

View File

@ -0,0 +1,10 @@
-- In the last migration, I accidentally created a many-to-many relationship.
-- What I actually need is a many-to-one relationship.
DROP TABLE api.segments_vod_links;
CREATE TABLE api.segments_vod_links (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
vod_id UUID NOT NULL REFERENCES api.vods(id),
segment_id UUID NOT NULL REFERENCES api.segments(id)
);

View File

@ -0,0 +1,5 @@
ALTER TABLE api.vods
DROP COLUMN is_recording_aborted;
ALTER TABLE api.vods
ADD COLUMN is_recording_aborted BOOLEAN DEFAULT FALSE;

View File

@ -0,0 +1,3 @@
-- roles & permissions
GRANT all ON api.segments_vod_links TO automation;
GRANT SELECT ON api.segments_vod_links TO web_anon;

View File

@ -0,0 +1,4 @@
-- vods needs discord_message_id for chatops
ALTER TABLE api.vods
ADD COLUMN discord_message_id TEXT;

View File

@ -0,0 +1,3 @@
ALTER TABLE api.vods
ADD COLUMN status TEXT;

View File

@ -0,0 +1,21 @@
-- 'NEW' in this context is a segment row.
-- we update this function to also set the vod status to 'recording' if applicable.
CREATE OR REPLACE FUNCTION update_vod_on_segment_update()
RETURNS TRIGGER AS $$
BEGIN
UPDATE api.vods
SET
updated_at = NOW(),
status = CASE
WHEN NEW.filesize > OLD.filesize THEN 'recording'
ELSE status
END
WHERE id IN (
SELECT vod_id
FROM segments_vod_links
WHERE segment_id = NEW.id
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,21 @@
-- 'NEW' in this context is a segment row.
-- we update this function to also set the vod status to 'recording' if applicable.
CREATE OR REPLACE FUNCTION update_vod_on_segment_update()
RETURNS TRIGGER AS $$
BEGIN
UPDATE api.vods
SET
updated_at = NOW(),
status = CASE
WHEN NEW.bytes > OLD.bytes THEN 'recording'
ELSE status
END
WHERE id IN (
SELECT vod_id
FROM segments_vod_links
WHERE segment_id = NEW.id
);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

View File

@ -0,0 +1,5 @@
ALTER TABLE api.vods
DROP COLUMN status;
ALTER TABLE api.vods
ADD COLUMN status TEXT DEFAULT 'pending_recording';

View File

@ -58,7 +58,9 @@
"slugify": "^1.6.6", "slugify": "^1.6.6",
"swagger-editor-dist": "^4.13.1", "swagger-editor-dist": "^4.13.1",
"swagger-ui-dist": "^5.17.14", "swagger-ui-dist": "^5.17.14",
"ts-json-schema-generator": "^2.3.0",
"tsx": "^4.18.0", "tsx": "^4.18.0",
"typescript-json-schema": "^0.65.1",
"xpath": "^0.0.34" "xpath": "^0.0.34"
}, },
"packageManager": "pnpm@9.6.0", "packageManager": "pnpm@9.6.0",
@ -66,16 +68,13 @@
"@babel/preset-env": "^7.25.4", "@babel/preset-env": "^7.25.4",
"@babel/preset-typescript": "^7.24.7", "@babel/preset-typescript": "^7.24.7",
"@futureporn/utils": "workspace:^", "@futureporn/utils": "workspace:^",
"@jest/globals": "^29.7.0",
"@types/chai": "^4.3.18", "@types/chai": "^4.3.18",
"@types/cheerio": "^0.22.35", "@types/cheerio": "^0.22.35",
"@types/jest": "^29.5.12",
"@types/mailparser": "^3.4.4", "@types/mailparser": "^3.4.4",
"@types/mocha": "^10.0.7", "@types/mocha": "^10.0.7",
"@types/sinon": "^17.0.3", "@types/sinon": "^17.0.3",
"chai": "^5.1.1", "chai": "^5.1.1",
"esmock": "^2.6.7", "esmock": "^2.6.7",
"jest": "^29.7.0",
"mocha": "^10.7.3", "mocha": "^10.7.3",
"nodemon": "^3.1.4", "nodemon": "^3.1.4",
"sinon": "^15.2.0", "sinon": "^15.2.0",

File diff suppressed because it is too large Load Diff

View File

@ -1,106 +0,0 @@
import Fastify from 'fastify'
import fastifySwagger from '@fastify/swagger'
import fastifySwaggerUi from '@fastify/swagger-ui'
import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'url'
import { dirname, join } from 'node:path'
const __dirname = dirname(fileURLToPath(import.meta.url));
const swaggerDarkCss = readFileSync(join(__dirname, './css/SwaggerDark.css'), { encoding: 'utf-8' })
async function fastifySetup(configs) {
const fastify = Fastify({
logger: true
})
await fastify.register(fastifySwagger)
await fastify.register(fastifySwaggerUi, {
theme: {
title: '@futureporn/scout',
css: [
{ filename: 'SwaggerDark.css', content: swaggerDarkCss }
]
},
routePrefix: '/',
uiConfig: {
docExpansion: 'full',
deepLinking: false
},
uiHooks: {
onRequest: function (request, reply, next) { next() },
preHandler: function (request, reply, next) { next() }
},
staticCSP: true,
transformStaticCSP: (header) => header,
transformSpecification: (swaggerObject, request, reply) => { return swaggerObject },
transformSpecificationClone: true
})
fastify.put('/some-route/:id', {
schema: {
description: 'post some data',
tags: ['vtuber'],
summary: 'qwerty',
params: {
type: 'object',
properties: {
id: {
type: 'string',
description: 'user id'
}
}
},
body: {
type: 'object',
properties: {
hello: { type: 'string' },
obj: {
type: 'object',
properties: {
some: { type: 'string' }
}
}
}
},
response: {
201: {
description: 'Successful response',
type: 'object',
properties: {
hello: { type: 'string' }
}
},
default: {
description: 'Default response',
type: 'object',
properties: {
foo: { type: 'string' }
}
}
},
}
}, (req, reply) => {
reply.send('HAHAHAHAHAH')
})
fastify.listen({ port: configs.port }, function (err, address) {
console.log(`@futureporn/scout listening on ${address}`)
if (err) {
fastify.log.error(err)
process.exit(1)
}
})
}
export default fastifySetup

View File

@ -0,0 +1,159 @@
import Fastify, { FastifyPluginCallback, FastifyReply, FastifyRequest, FastifyReplyContext, FastifyPluginAsync, FastifyServerOptions } from 'fastify'
import fastifySwagger from '@fastify/swagger'
import fastifySwaggerUi from '@fastify/swagger-ui'
import { readFileSync } from 'node:fs'
import { fileURLToPath } from 'url'
import { dirname, join } from 'node:path'
import { Config } from './config'
import { VtuberRecord, VtuberResponse, VtuberDataScrape } from './schemas.ts'
import scrapeVtuberData from './scrapeVtuberData.ts'
import { getPlaylistUrl } from './ytdlp.ts'
import { getRandomRoom } from './cb.ts'
import { getPackageVersion } from '@futureporn/utils'
type VtuberDataRequest = FastifyRequest<{
Querystring: { url: string }
}>
const __dirname = dirname(fileURLToPath(import.meta.url));
const swaggerDarkCss = readFileSync(join(__dirname, './css/SwaggerDark.css'), { encoding: 'utf-8' })
const version = getPackageVersion(join(__dirname, '../package.json'))
async function fastifySetup(configs: Config) {
const fastify = Fastify({
logger: true
})
await fastify.register(fastifySwagger, {
openapi: {
info: {
title: '@futureporn/scout',
description: 'Vtuber data acquisition API',
version
},
}
})
await fastify.register(fastifySwaggerUi, {
theme: {
title: '@fp/scout',
css: [
{ filename: 'SwaggerDark.css', content: swaggerDarkCss }
]
},
routePrefix: '/',
uiConfig: {
docExpansion: 'list',
deepLinking: true
},
uiHooks: {
onRequest: function (request: FastifyRequest, reply: FastifyReply, next: any) { next() },
preHandler: function (request: FastifyRequest, reply: FastifyReply, next: any) { next() }
},
staticCSP: true,
transformStaticCSP: (header: any) => header,
transformSpecification: (swaggerObject: any, request: FastifyRequest, reply: FastifyReply) => { return swaggerObject },
transformSpecificationClone: true
})
fastify.addSchema(VtuberResponse)
fastify.addSchema(VtuberRecord)
fastify.addSchema(VtuberDataScrape)
fastify.get('/chaturbate/random-room', {
schema: {
response: {
'2xx': {
type: 'object'
}
},
tags: ['chaturbate']
}
}, async (req, reply) => {
const room = await getRandomRoom()
console.log(room)
reply.type('application/json').send(JSON.stringify(room))
})
fastify.get('/ytdlp/playlist-url', {
schema: {
querystring: {
type: 'object',
properties: {
url: {
type: 'string'
}
}
},
response: {
'2xx': {
error: { type: 'boolean' },
message: { type: 'string' },
data: { type: 'object', properties: {
url: { type: 'string' }
}}
}
},
tags: ['yt-dlp']
}
}, async (req: VtuberDataRequest, reply) => {
try {
const playlistUrl = await getPlaylistUrl(req.query.url)
console.log(`playlistUrl=${playlistUrl}`)
reply.type('application/json').send(JSON.stringify({ data: { url: playlistUrl } }))
} catch (e) {
reply.type('application/json').send(JSON.stringify({ data: null, error: e }))
}
})
fastify.get('/vtuber/data', {
schema: {
querystring: {
type: 'object',
properties: {
url: {
type: 'string',
description: 'URL of a vtuber profile on Chaturbate or Fansly. ex: https://chaturbate.com/projektmelody'
}
}
},
response: {
'2xx': { $ref: 'VtuberDataScrape' }
},
tags: [
'vtuber'
]
}
}, async (req: VtuberDataRequest, reply) => {
console.log(`we received a request with url=${req.query.url}`)
const data = await scrapeVtuberData(req.query.url)
reply.type('application/json').send(data)
})
fastify.listen({ host: '0.0.0.0', port: configs.port }, function (err, address) {
console.log(`@futureporn/scout listening on ${address}`)
if (err) {
fastify.log.error(err)
process.exit(1)
}
})
}
export default fastifySetup

View File

@ -3,3 +3,4 @@ import fastify from './fastify.js'
import { configs } from './config.js' import { configs } from './config.js'
fastify(configs) fastify(configs)

View File

@ -0,0 +1,28 @@
/**
* schemas.ts
*
* uses ts-json-schema-generator to generate json schema from typescript declarations.
*
*/
import tsj, { Config } from 'ts-json-schema-generator'
import { createRequire } from 'node:module'
const require = createRequire(import.meta.url);
const typesIndex = require.resolve('@futureporn/types/src/index.ts');
const typesTsConfig = require.resolve('@futureporn/types/tsconfig.json');
const config: Config = {
path: typesIndex,
tsconfig: typesTsConfig,
type: '*',
skipTypeCheck: true,
encodeRefs: true,
topRef: false,
}
export const VtuberResponse = tsj.createGenerator(Object.assign(config, { schemaId: 'VtuberResponse' })).createSchema('VtuberResponse')
export const VtuberRecord = tsj.createGenerator(Object.assign(config, { schemaId: 'VtuberRecord' })).createSchema('VtuberRecord')
export const VtuberDataScrape = tsj.createGenerator(Object.assign(config, { schemaId: 'VtuberDataScrape' })).createSchema('VtuberDataScrape')
// export const VtuberDataScrape = tsj.createGenerator(Object.assign(config, { schemaId: 'VtuberDataScrape' })).createSchema('VtuberDataScrape')

View File

@ -1,4 +1,4 @@
import type { VtuberRecord } from "@futureporn/types" import type { VtuberDataScrape } from "@futureporn/types"
import { fetchHtml, getBroadcasterDisplayName, getInitialRoomDossier } from './cb.ts' import { fetchHtml, getBroadcasterDisplayName, getInitialRoomDossier } from './cb.ts'
import { getAccountData, usernameRegex } from "./fansly.ts" import { getAccountData, usernameRegex } from "./fansly.ts"
import { fpSlugify } from "@futureporn/utils" import { fpSlugify } from "@futureporn/utils"
@ -10,9 +10,9 @@ import { fpSlugify } from "@futureporn/utils"
* *
* The purpose of this function is to retrieve enough data about a vtuber in order for us to create a database record about them. * The purpose of this function is to retrieve enough data about a vtuber in order for us to create a database record about them.
*/ */
export default async function scrapeVtuberData(url: string): Promise<Partial<VtuberRecord>> { export default async function scrapeVtuberData(url: string): Promise<VtuberDataScrape> {
let display_name, chaturbate_id, slug, fansly_id let display_name, chaturbate_id, slug, fansly_id, chaturbate, fansly
if (url.match(/chaturbate/)) { if (url.match(/chaturbate/)) {
console.log('url matches chaturbate') console.log('url matches chaturbate')
let html = await fetchHtml(url) let html = await fetchHtml(url)
@ -22,6 +22,7 @@ export default async function scrapeVtuberData(url: string): Promise<Partial<Vtu
slug = dossier.broadcaster_username slug = dossier.broadcaster_username
chaturbate_id = dossier.broadcaster_uid chaturbate_id = dossier.broadcaster_uid
display_name = getBroadcasterDisplayName(html) display_name = getBroadcasterDisplayName(html)
chaturbate = url
if (!display_name) throw new Error('failed to get broadcaster display name from chaturbate'); if (!display_name) throw new Error('failed to get broadcaster display name from chaturbate');
// console.log(dossier) // console.log(dossier)
} else if (url.match(/fansly/)) { } else if (url.match(/fansly/)) {
@ -32,6 +33,7 @@ export default async function scrapeVtuberData(url: string): Promise<Partial<Vtu
slug = fpSlugify(data.username) slug = fpSlugify(data.username)
display_name = data?.username display_name = data?.username
fansly_id = data.id fansly_id = data.id
fansly = url
} else { } else {
throw new Error('The URL does not match a known platform'); throw new Error('The URL does not match a known platform');
} }
@ -40,7 +42,9 @@ export default async function scrapeVtuberData(url: string): Promise<Partial<Vtu
return { return {
display_name, display_name,
chaturbate_id, chaturbate_id,
chaturbate,
fansly_id, fansly_id,
fansly,
slug, slug,
} }

View File

@ -30,7 +30,7 @@ export class RoomOfflineError extends Error {
export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string> { export async function getPlaylistUrl (roomUrl: string, proxy = false, retries = 0): Promise<string|null> {
console.log(`getPlaylistUrl roomUrl=${roomUrl}, proxy=${false}, retries=${retries}`) console.log(`getPlaylistUrl roomUrl=${roomUrl}, proxy=${false}, retries=${retries}`)
let args = ['-g', roomUrl] let args = ['-g', roomUrl]
if (proxy) { if (proxy) {