futureporn-scout/index.ts

384 lines
12 KiB
TypeScript
Executable File

import Room from './src/Room.js';
import { loggerFactory } from "./src/logger.js";
import { assertYtdlpExistence } from './src/ytdlp.js';
import * as faye from './src/faye.js';
import * as sound from './src/sound.js';
import * as cb from './src/cb.js';
import { IAppContext, appEnv, getAppContext } from './src/appContext.js';
import { getSuperRealtimeClient } from './src/realtime.js';
import * as Ably from 'ably/promises.js';
import express, { Express, Request, Response } from 'express';
import { z } from 'zod'
import {
onCbMessage,
onCbTitle,
onCbSilence,
onCbStatus,
onCbNotice,
onCbTip,
onCbPassword,
} from './src/cbCallbacks.js'
import ChaturbateAuth from './src/ChaturbateAuth.js'
import fs from 'node:fs'
import Database from 'better-sqlite3';
import os from 'node:os'
import path from 'node:path'
import yargs from 'yargs'
import { hideBin } from 'yargs/helpers'
import $fastq from 'fastq';
import gotClient from './src/gotClient.js'
import { signalStart } from './src/faye.js'
import { getPushServiceAuth } from './src/headless.js'
interface IRoomRecord {
name: string;
id: string;
monitor: number;
}
const fastq = $fastq.promise
const epoch = new Date(0).valueOf()
const now = new Date().valueOf()
async function getDataDir() {
// Define dataDir
const dataDir = path.join(os.homedir(), '.local', 'share', 'futureporn-scout');
try {
// Check if dataDir exists
await fs.promises.access(dataDir);
} catch (error) {
// Create dataDir if it doesn't exist
await fs.promises.mkdir(dataDir, { recursive: true });
}
return dataDir;
}
async function init() {
const dataDir = await getDataDir()
const dbPath = path.join(dataDir, 'scout.db')
await assertYtdlpExistence()
const db = new Database(dbPath);
db.pragma('journal_mode = WAL');
const logger = loggerFactory({
defaultMeta: { service: 'futureporn/scout' }
})
return getAppContext(appEnv, logger, db, sound, cb, dataDir, gotClient)
}
/**
*
* listen to room status messages 24/7
* when events are received on the ably realtime client, we log the appropriate message to db.
*
* @deprecated
*/
async function registerRoomStatusListeners(appContext, room) {
console.log(`>> registering ${room.name}`);
await Promise.allSettled([room.id, room.dossier]);
const psa = await getPushServiceAuth(room.url);
const channelMap = Room.getChannelMapFromPsa(psa);
const realtimeClient = await getSuperRealtimeClient(room.name, psa);
console.log(realtimeClient);
realtimeClient.connection.once('connected', () => {
appContext.logger.log({ level: 'info', message: `${room.name} CB Realtime Connected!` });
appContext.logger.log({ level: 'debug', message: `connection.state=${realtimeClient.connection.state}` });
const realtimeChannelName = Room.getRealtimeChannelNameFromCbChannelName(channelMap, `RoomStatusTopic#RoomStatusTopic:${room.id}`);
const realtimeChannel = realtimeClient.channels.get(realtimeChannelName);
// console.log(channels);
// const realtimeChannelName = ``;
// const realtimeChannelName = channels[`RoomStatusTopic#RoomStatusTopic:${room.id}`];
// appContext.logger.log({ level: 'info', message: `>> subscribing to ${realtimeChannelName}` });
realtimeChannel.subscribe((message) => {
appContext.logger.log({ level: 'info', message: `got a message from realtime, as follows.` });
console.log(message);
room.onStatus(message);
});
// for (const room of publicRooms) {
// room.monitorChannel(room.getAblyChannelName('status'), room.onStatus)
// // @see https://github.com/futureporn/futureporn-scout/issues/3
// // @todo these are for short-term forcing of, 'we need to get up and running' mandate
// // these subscriptions need to be removed from here and added on-demand when
// // a room goes live or when admin (me) manually monitors them via api
// // room.monitorChannel(room.getAblyChannelName('title'), room.onTitle)
// // room.monitorChannel(room.getAblyChannelName('tip'), room.onTip)
// }
})
realtimeClient.connect();
return realtimeClient;
}
async function record(room: string) {
const appContext = await init();
appContext.faye = faye.fayeFactory(appContext)
const playlistUrl = await cb.getPlaylistUrl(appContext, room);
signalStart(appContext, room, playlistUrl);
}
async function getRoomsFromDb(appContext: IAppContext) {
const stmt = appContext.db.prepare('SELECT name, id FROM rooms WHERE monitor = TRUE')
const roomsRecords = stmt.all();
return roomsRecords;
}
async function serveApi(appContext: IAppContext) {
appContext.expressApp.get('/', function (req: Request, res: Response) {
res.send('*futureporn-scout pisses on the floor*');
});
appContext.expressApp.get('/rooms', function (req: Request, res: Response) {
res.status(200).json({ rooms: Object.entries(appContext.rooms).map((entry) => entry[0]) });
});
appContext.expressApp.post('/rooms', function (req: Request, res: Response) {
const name = req.query.name as string;
try {
Room.roomNameSchema.parse(name);
} catch (e) {
return res.status(400).json({ code: 400, error: true, message: 'name must be sent as a query param' });
}
try {
appContext.logger.log({ level: 'debug', message: `Creating room name=${name}` });
createRoom(appContext, name)
} catch (e) {
return res.status(500).json({ code: 500, error: true, message: `failed to create ${name}. ${e}` });
}
res.send(`${name} added to watchlist.`);
})
appContext.expressApp.delete('/rooms', function (req: Request, res: Response) {
const name = req.query.name as string;
try {
Room.roomNameSchema.parse(name);
} catch (e) {
return res.status(400).json({ code: 400, error: true, message: 'name must be sent as a query param' });
}
try {
deleteRoom(appContext, name);
return res.send(`${name} deleted from watchlist.`);
} catch (e) {
return res.status(500).json({ code: 500, error: true, message: `failed to delete. ${e}` });
}
})
const port = process.env.PORT || 3030;
appContext.expressApp.listen(port);
appContext.logger.log({ level: 'info', message: `REST API server listening on port ${port}` });
}
function createRoom (appContext: IAppContext, name: string): Room {
if (!!appContext.rooms[name]) {
appContext.logger.log({ level: 'info', message: `${name} is already being watched.` });
return;
}
appContext.logger.log({ level: 'info', message: `creating room ${name}`});
const room = new Room(appContext, {
name: name,
id: null,
onStatus: (m: Ably.Types.Message) => onCbStatus(appContext, name, m),
onMessage: (m: Ably.Types.Message) => onCbMessage(appContext, name, m),
onSilence: (m: Ably.Types.Message) => onCbSilence(appContext, name, m),
onTitle: (m: Ably.Types.Message) => onCbTitle(appContext, name, m),
onTip: (m: Ably.Types.Message) => onCbTip(appContext, name, m),
onPassword: (m: Ably.Types.Message) => onCbPassword(appContext, name, m),
onNotice: (m: Ably.Types.Message) => onCbNotice(appContext, name, m),
})
appContext.rooms[name] = room;
room.startWatching();
return room;
}
async function deleteRoom (appContext: IAppContext, roomName: string): Promise<void> {
const existingRoom = appContext.rooms[roomName];
if (!existingRoom) appContext.logger.log({ level: 'warn', message: `Cannot delete ${roomName} because it is not in the watchlist.` });
const room = appContext.rooms[roomName]
// console.log('here is the room')
// console.log(room)
await room.stopWatching();
delete appContext.rooms[roomName];
}
async function daemon() {
const appContext: IAppContext = await init();
appContext.faye = faye.fayeFactory(appContext);
const roomsRecords = await getRoomsFromDb(appContext);
appContext.logger.log({ level: 'info', message: `Watching the following rooms` });
for (const room of roomsRecords) {
appContext.logger.log({ level: 'info', message: `${room.name}` });
createRoom(appContext, room.name);
}
// for (const room of Object.entries(appContext.rooms)) {
// const r = room[1];
// r.startWatching();
// }
serveApi(appContext);
}
/**
* get the room's dossier
*/
async function dossier(roomName: string) {
const appContext = await init()
const r = new Room(appContext, {
name: roomName
})
const doss = await r.getInitialRoomDossier()
console.log(doss)
}
/**
*
* exportLogs
* export chat logs into a format suitable for publishing
* @todo
*/
async function exportLogs(options) {
const appContext = await init()
// let stmt = appContext.db.prepare('SELECT ...')
// if (options.auto) {
// const stream = new Stream(appContext, options.room);
// const r = stream.getMostRecentStream()
// console.log(r)
// }
// use lifecycles database to find most recent start & stop events.
// use a 5 minute threshold to determine stream starts/s
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' ORDER BY data_ts ASC;`)
// } else if (!!options.since && !!options.until) {
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts >= ${options.since} ORDER BY data_ts ASC;`)
// } else if (!!options.since && !options.until) {
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts >= ${options.since} ORDER BY data_ts ASC;`)
// } else if (!options.since && !!options.until) {
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts <= ${options.until} ORDER BY data_ts ASC;`)
// }
// const messages = stmt.all()
// console.log(messages)
}
yargs(hideBin(process.argv))
.command({
command: 'daemon',
alias: 'd',
desc: 'Listen & log chaturbate events',
builder: () => { },
handler: (argv) => {
console.info(argv)
daemon()
}
})
.command({
command: 'dossier',
desc: `Get the room's dossier`,
builder: (yargs) => {
return yargs
.option('room', {
alias: 'r',
describe: 'name of the CB room',
required: true
})
},
handler: (argv) => {
console.info(argv)
dossier(argv.room)
}
})
.command({
command: 'record',
alias: 'r',
desc: 'Manually start recording a CB stream',
builder: (yargs) => {
return yargs
.option('room', {
aliases: ['r', 'n'],
describe: 'the name of the chaturbate room',
required: true
})
},
handler: (argv) => {
record(argv.room)
}
})
.command({
command: 'export',
alias: 'e',
desc: 'Export chat logs as JSON',
builder: (yargs) => {
return yargs
.option('output', {
alias: 'o',
describe: 'output file location',
required: true
})
.option('room', {
aliases: ['r', 'n'],
describe: 'the name of the chaturbate room',
required: true
})
.option('since', {
alias: 's',
describe: 'export logs since this date',
default: epoch,
required: false
})
.option('until', {
alias: 'u',
describe: 'export logs until this date',
default: now,
required: false
})
// .option('auto', {
// alias: 'a',
// describe: 'gets the most recent stream',
// required: false
// })
},
handler: (argv) => {
if (argv.auto && (argv.since !== epoch || argv.until !== now)) {
throw new Error('cannot use --auto with --since or --until.')
}
console.info(argv)
exportLogs(argv)
}
})
.demandCommand(1)
.help()
.parse()