384 lines
12 KiB
TypeScript
Executable File
384 lines
12 KiB
TypeScript
Executable File
|
|
|
|
import Room from './src/Room.js';
|
|
import { loggerFactory } from "./src/logger.js";
|
|
import { assertYtdlpExistence } from './src/ytdlp.js';
|
|
import * as faye from './src/faye.js';
|
|
import * as sound from './src/sound.js';
|
|
import * as cb from './src/cb.js';
|
|
import { IAppContext, appEnv, getAppContext } from './src/appContext.js';
|
|
import { getSuperRealtimeClient } from './src/realtime.js';
|
|
import * as Ably from 'ably/promises.js';
|
|
import express, { Express, Request, Response } from 'express';
|
|
import { z } from 'zod'
|
|
|
|
import {
|
|
onCbMessage,
|
|
onCbTitle,
|
|
onCbSilence,
|
|
onCbStatus,
|
|
onCbNotice,
|
|
onCbTip,
|
|
onCbPassword,
|
|
} from './src/cbCallbacks.js'
|
|
import ChaturbateAuth from './src/ChaturbateAuth.js'
|
|
import fs from 'node:fs'
|
|
import Database from 'better-sqlite3';
|
|
import os from 'node:os'
|
|
import path from 'node:path'
|
|
import yargs from 'yargs'
|
|
import { hideBin } from 'yargs/helpers'
|
|
import $fastq from 'fastq';
|
|
import gotClient from './src/gotClient.js'
|
|
import { signalStart } from './src/faye.js'
|
|
import { getPushServiceAuth } from './src/headless.js'
|
|
|
|
|
|
interface IRoomRecord {
|
|
name: string;
|
|
id: string;
|
|
monitor: number;
|
|
}
|
|
|
|
|
|
const fastq = $fastq.promise
|
|
const epoch = new Date(0).valueOf()
|
|
const now = new Date().valueOf()
|
|
|
|
|
|
async function getDataDir() {
|
|
// Define dataDir
|
|
const dataDir = path.join(os.homedir(), '.local', 'share', 'futureporn-scout');
|
|
|
|
try {
|
|
// Check if dataDir exists
|
|
await fs.promises.access(dataDir);
|
|
} catch (error) {
|
|
// Create dataDir if it doesn't exist
|
|
await fs.promises.mkdir(dataDir, { recursive: true });
|
|
}
|
|
|
|
return dataDir;
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async function init() {
|
|
const dataDir = await getDataDir()
|
|
const dbPath = path.join(dataDir, 'scout.db')
|
|
|
|
|
|
await assertYtdlpExistence()
|
|
|
|
const db = new Database(dbPath);
|
|
db.pragma('journal_mode = WAL');
|
|
|
|
const logger = loggerFactory({
|
|
defaultMeta: { service: 'futureporn/scout' }
|
|
})
|
|
|
|
return getAppContext(appEnv, logger, db, sound, cb, dataDir, gotClient)
|
|
}
|
|
|
|
/**
|
|
*
|
|
* listen to room status messages 24/7
|
|
* when events are received on the ably realtime client, we log the appropriate message to db.
|
|
*
|
|
* @deprecated
|
|
*/
|
|
async function registerRoomStatusListeners(appContext, room) {
|
|
|
|
console.log(`>> registering ${room.name}`);
|
|
|
|
await Promise.allSettled([room.id, room.dossier]);
|
|
|
|
const psa = await getPushServiceAuth(room.url);
|
|
const channelMap = Room.getChannelMapFromPsa(psa);
|
|
const realtimeClient = await getSuperRealtimeClient(room.name, psa);
|
|
console.log(realtimeClient);
|
|
|
|
realtimeClient.connection.once('connected', () => {
|
|
appContext.logger.log({ level: 'info', message: `${room.name} CB Realtime Connected!` });
|
|
appContext.logger.log({ level: 'debug', message: `connection.state=${realtimeClient.connection.state}` });
|
|
|
|
const realtimeChannelName = Room.getRealtimeChannelNameFromCbChannelName(channelMap, `RoomStatusTopic#RoomStatusTopic:${room.id}`);
|
|
const realtimeChannel = realtimeClient.channels.get(realtimeChannelName);
|
|
|
|
// console.log(channels);
|
|
|
|
// const realtimeChannelName = ``;
|
|
// const realtimeChannelName = channels[`RoomStatusTopic#RoomStatusTopic:${room.id}`];
|
|
// appContext.logger.log({ level: 'info', message: `>> subscribing to ${realtimeChannelName}` });
|
|
|
|
realtimeChannel.subscribe((message) => {
|
|
appContext.logger.log({ level: 'info', message: `got a message from realtime, as follows.` });
|
|
console.log(message);
|
|
room.onStatus(message);
|
|
});
|
|
|
|
|
|
// for (const room of publicRooms) {
|
|
// room.monitorChannel(room.getAblyChannelName('status'), room.onStatus)
|
|
|
|
// // @see https://github.com/futureporn/futureporn-scout/issues/3
|
|
// // @todo these are for short-term forcing of, 'we need to get up and running' mandate
|
|
// // these subscriptions need to be removed from here and added on-demand when
|
|
// // a room goes live or when admin (me) manually monitors them via api
|
|
// // room.monitorChannel(room.getAblyChannelName('title'), room.onTitle)
|
|
// // room.monitorChannel(room.getAblyChannelName('tip'), room.onTip)
|
|
|
|
// }
|
|
})
|
|
|
|
realtimeClient.connect();
|
|
|
|
return realtimeClient;
|
|
}
|
|
|
|
|
|
async function record(room: string) {
|
|
const appContext = await init();
|
|
appContext.faye = faye.fayeFactory(appContext)
|
|
const playlistUrl = await cb.getPlaylistUrl(appContext, room);
|
|
signalStart(appContext, room, playlistUrl);
|
|
}
|
|
|
|
|
|
async function getRoomsFromDb(appContext: IAppContext) {
|
|
const stmt = appContext.db.prepare('SELECT name, id FROM rooms WHERE monitor = TRUE')
|
|
const roomsRecords = stmt.all();
|
|
return roomsRecords;
|
|
}
|
|
|
|
async function serveApi(appContext: IAppContext) {
|
|
appContext.expressApp.get('/', function (req: Request, res: Response) {
|
|
res.send('*futureporn-scout pisses on the floor*');
|
|
});
|
|
appContext.expressApp.get('/rooms', function (req: Request, res: Response) {
|
|
res.status(200).json({ rooms: Object.entries(appContext.rooms).map((entry) => entry[0]) });
|
|
});
|
|
appContext.expressApp.post('/rooms', function (req: Request, res: Response) {
|
|
const name = req.query.name as string;
|
|
try {
|
|
Room.roomNameSchema.parse(name);
|
|
} catch (e) {
|
|
return res.status(400).json({ code: 400, error: true, message: 'name must be sent as a query param' });
|
|
}
|
|
try {
|
|
appContext.logger.log({ level: 'debug', message: `Creating room name=${name}` });
|
|
createRoom(appContext, name)
|
|
} catch (e) {
|
|
return res.status(500).json({ code: 500, error: true, message: `failed to create ${name}. ${e}` });
|
|
}
|
|
res.send(`${name} added to watchlist.`);
|
|
})
|
|
appContext.expressApp.delete('/rooms', function (req: Request, res: Response) {
|
|
const name = req.query.name as string;
|
|
try {
|
|
Room.roomNameSchema.parse(name);
|
|
} catch (e) {
|
|
return res.status(400).json({ code: 400, error: true, message: 'name must be sent as a query param' });
|
|
}
|
|
try {
|
|
deleteRoom(appContext, name);
|
|
return res.send(`${name} deleted from watchlist.`);
|
|
} catch (e) {
|
|
return res.status(500).json({ code: 500, error: true, message: `failed to delete. ${e}` });
|
|
}
|
|
})
|
|
const port = process.env.PORT || 3030;
|
|
appContext.expressApp.listen(port);
|
|
appContext.logger.log({ level: 'info', message: `REST API server listening on port ${port}` });
|
|
}
|
|
|
|
|
|
function createRoom (appContext: IAppContext, name: string): Room {
|
|
if (!!appContext.rooms[name]) {
|
|
appContext.logger.log({ level: 'info', message: `${name} is already being watched.` });
|
|
return;
|
|
}
|
|
appContext.logger.log({ level: 'info', message: `creating room ${name}`});
|
|
const room = new Room(appContext, {
|
|
name: name,
|
|
id: null,
|
|
onStatus: (m: Ably.Types.Message) => onCbStatus(appContext, name, m),
|
|
onMessage: (m: Ably.Types.Message) => onCbMessage(appContext, name, m),
|
|
onSilence: (m: Ably.Types.Message) => onCbSilence(appContext, name, m),
|
|
onTitle: (m: Ably.Types.Message) => onCbTitle(appContext, name, m),
|
|
onTip: (m: Ably.Types.Message) => onCbTip(appContext, name, m),
|
|
onPassword: (m: Ably.Types.Message) => onCbPassword(appContext, name, m),
|
|
onNotice: (m: Ably.Types.Message) => onCbNotice(appContext, name, m),
|
|
})
|
|
appContext.rooms[name] = room;
|
|
room.startWatching();
|
|
return room;
|
|
}
|
|
|
|
async function deleteRoom (appContext: IAppContext, roomName: string): Promise<void> {
|
|
const existingRoom = appContext.rooms[roomName];
|
|
if (!existingRoom) appContext.logger.log({ level: 'warn', message: `Cannot delete ${roomName} because it is not in the watchlist.` });
|
|
const room = appContext.rooms[roomName]
|
|
// console.log('here is the room')
|
|
// console.log(room)
|
|
|
|
await room.stopWatching();
|
|
delete appContext.rooms[roomName];
|
|
}
|
|
|
|
|
|
async function daemon() {
|
|
const appContext: IAppContext = await init();
|
|
appContext.faye = faye.fayeFactory(appContext);
|
|
const roomsRecords = await getRoomsFromDb(appContext);
|
|
|
|
appContext.logger.log({ level: 'info', message: `Watching the following rooms` });
|
|
for (const room of roomsRecords) {
|
|
appContext.logger.log({ level: 'info', message: ` ○ ${room.name}` });
|
|
createRoom(appContext, room.name);
|
|
}
|
|
|
|
// for (const room of Object.entries(appContext.rooms)) {
|
|
// const r = room[1];
|
|
// r.startWatching();
|
|
// }
|
|
|
|
serveApi(appContext);
|
|
}
|
|
|
|
|
|
/**
|
|
* get the room's dossier
|
|
*/
|
|
async function dossier(roomName: string) {
|
|
const appContext = await init()
|
|
const r = new Room(appContext, {
|
|
name: roomName
|
|
})
|
|
const doss = await r.getInitialRoomDossier()
|
|
console.log(doss)
|
|
}
|
|
|
|
|
|
/**
|
|
*
|
|
* exportLogs
|
|
* export chat logs into a format suitable for publishing
|
|
* @todo
|
|
*/
|
|
async function exportLogs(options) {
|
|
const appContext = await init()
|
|
|
|
// let stmt = appContext.db.prepare('SELECT ...')
|
|
// if (options.auto) {
|
|
// const stream = new Stream(appContext, options.room);
|
|
// const r = stream.getMostRecentStream()
|
|
// console.log(r)
|
|
// }
|
|
|
|
// use lifecycles database to find most recent start & stop events.
|
|
// use a 5 minute threshold to determine stream starts/s
|
|
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' ORDER BY data_ts ASC;`)
|
|
// } else if (!!options.since && !!options.until) {
|
|
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts >= ${options.since} ORDER BY data_ts ASC;`)
|
|
// } else if (!!options.since && !options.until) {
|
|
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts >= ${options.since} ORDER BY data_ts ASC;`)
|
|
// } else if (!options.since && !!options.until) {
|
|
// stmt = appContext.db.prepare(`SELECT * FROM messages WHERE _room = '${options.room}' AND data_ts <= ${options.until} ORDER BY data_ts ASC;`)
|
|
// }
|
|
// const messages = stmt.all()
|
|
|
|
// console.log(messages)
|
|
}
|
|
|
|
yargs(hideBin(process.argv))
|
|
.command({
|
|
command: 'daemon',
|
|
alias: 'd',
|
|
desc: 'Listen & log chaturbate events',
|
|
builder: () => { },
|
|
handler: (argv) => {
|
|
console.info(argv)
|
|
daemon()
|
|
}
|
|
})
|
|
.command({
|
|
command: 'dossier',
|
|
desc: `Get the room's dossier`,
|
|
builder: (yargs) => {
|
|
return yargs
|
|
.option('room', {
|
|
alias: 'r',
|
|
describe: 'name of the CB room',
|
|
required: true
|
|
})
|
|
},
|
|
handler: (argv) => {
|
|
console.info(argv)
|
|
dossier(argv.room)
|
|
}
|
|
})
|
|
.command({
|
|
command: 'record',
|
|
alias: 'r',
|
|
desc: 'Manually start recording a CB stream',
|
|
builder: (yargs) => {
|
|
return yargs
|
|
.option('room', {
|
|
aliases: ['r', 'n'],
|
|
describe: 'the name of the chaturbate room',
|
|
required: true
|
|
})
|
|
},
|
|
handler: (argv) => {
|
|
record(argv.room)
|
|
}
|
|
})
|
|
.command({
|
|
command: 'export',
|
|
alias: 'e',
|
|
desc: 'Export chat logs as JSON',
|
|
builder: (yargs) => {
|
|
return yargs
|
|
.option('output', {
|
|
alias: 'o',
|
|
describe: 'output file location',
|
|
required: true
|
|
})
|
|
.option('room', {
|
|
aliases: ['r', 'n'],
|
|
describe: 'the name of the chaturbate room',
|
|
required: true
|
|
})
|
|
.option('since', {
|
|
alias: 's',
|
|
describe: 'export logs since this date',
|
|
default: epoch,
|
|
required: false
|
|
})
|
|
.option('until', {
|
|
alias: 'u',
|
|
describe: 'export logs until this date',
|
|
default: now,
|
|
required: false
|
|
})
|
|
// .option('auto', {
|
|
// alias: 'a',
|
|
// describe: 'gets the most recent stream',
|
|
// required: false
|
|
// })
|
|
},
|
|
handler: (argv) => {
|
|
if (argv.auto && (argv.since !== epoch || argv.until !== now)) {
|
|
throw new Error('cannot use --auto with --since or --until.')
|
|
}
|
|
console.info(argv)
|
|
exportLogs(argv)
|
|
}
|
|
})
|
|
.demandCommand(1)
|
|
.help()
|
|
.parse() |