This commit is contained in:
cj@futureporn.net 2023-09-06 15:12:01 -08:00
parent 94011060a1
commit befe01cc7f
12 changed files with 472 additions and 2316 deletions

View File

@ -11,15 +11,19 @@
Daemon mode. Log chat and room events
./index.js -D
node ./dist/index.js daemon
Export mode. Export sql chat messages as IndexedDB
Dossier mode. Get a room's initialRoomDossier. Helpful for getting a room's Chaturbate ID.
node ./dist/index.js dossier -r projektmelody
## Dev notes
Future idea: Export mode. Export sql chat messages as IndexedDB
test
### TUI

7
index.ts Normal file → Executable file
View File

@ -204,11 +204,8 @@ async function registerRoomStatusListeners(appContext, rooms, cb, ably) {
// @todo these are for short-term forcing of, 'we need to get up and running' mandate
// these subscriptions need to be removed from here and added on-demand when
// a room goes live or when admin (me) manually monitors them via api
room.monitorChannel(room.getAblyChannelName('message'), room.onMessage)
room.monitorChannel(room.getAblyChannelName('silence'), room.onSilence)
room.monitorChannel(room.getAblyChannelName('title'), room.onTitle)
room.monitorChannel(room.getAblyChannelName('tip'), room.onTip)
room.monitorChannel(room.getAblyChannelName('password'), room.onPassword)
// room.monitorChannel(room.getAblyChannelName('title'), room.onTitle)
// room.monitorChannel(room.getAblyChannelName('tip'), room.onTip)
}
})

File diff suppressed because it is too large Load Diff

View File

@ -103,58 +103,63 @@ export default class ChaturbateAuth {
}
}
// the number suffixes are assumed to be session ids.
// opening multiple tabs of the same room will not increment the :n suffix.
// however, opening different rooms in multiple tags will increment the :n suffix
// maybe Ably does request deduplication for same sesionIds???
// we don't need to compute the sessionId, as pushServiceAuth response contains it.
static getPermissionsForm(roomIds: string[], topics: string[], csrfToken: string): FormData {
if (!roomIds || roomIds.length === 0) {
throw new Error('roomIds is required but it was undefined or empty');
static getPresenceId(): string {
return Math.random().toString(36).substring(2)
}
const generateTopic = (topicKey: string, topicValue: string) => {
return JSON.stringify({ [topicKey]: topicValue });
};
let form = new FormData();
let formTopics = {};
roomIds.forEach((roomId) => {
if (topics.includes('message')) {
formTopics[`RoomMessageTopic#RoomMessageTopic:${roomId}`] = { "broadcaster_uid": roomId }
}
// the number suffixes are assumed to be session ids.
// opening multiple tabs of the same room will not increment the :n suffix.
// however, opening different rooms in multiple tags will increment the :n suffix
// maybe Ably does request deduplication for same sesionIds???
// we don't need to compute the sessionId, as pushServiceAuth response contains it.
static getPermissionsForm(roomIds: string[], topics: string[], csrfToken: string): FormData {
if (!roomIds || roomIds.length === 0) {
throw new Error('roomIds is required but it was undefined or empty');
}
if (topics.includes('silence')) {
formTopics[`RoomSilenceTopic#RoomSilenceTopic:${roomId}`] = {"broadcaster_uid":roomId}
}
const generateTopic = (topicKey: string, topicValue: string) => {
return JSON.stringify({ [topicKey]: topicValue });
};
if (topics.includes('status')) {
formTopics[`RoomStatusTopic#RoomStatusTopic:${roomId}`] = {"broadcaster_uid":roomId}
}
if (topics.includes('title')) {
formTopics[`RoomTitleChangeTopic#RoomTitleChangeTopic:${roomId}`] = { "broadcaster_uid": roomId }
}
let form = new FormData();
let formTopics = {};
if (topics.includes('tip')) {
formTopics[`RoomTipAlertTopic#RoomTipAlertTopic:${roomId}`] = { "broacaster_uid": roomId}
}
roomIds.forEach((roomId) => {
if (topics.includes('password')) {
formTopics[`RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:${roomId}`] = { "broadcaster_uid": roomId}
}
if (topics.includes('message')) {
formTopics[`RoomMessageTopic#RoomMessageTopic:${roomId}`] = { "broadcaster_uid": roomId }
}
});
// @todo this one is somehow bad https://gitea.futureporn.net/futureporn/futureporn-scout/issues/3
// if (topics.includes('silence')) {
// formTopics[`RoomSilenceTopic#RoomSilenceTopic:${roomId}`] = { "broadcaster_uid": roomId }
// }
form.append('topics', JSON.stringify(formTopics))
form.append('csrfmiddlewaretoken', csrfToken);
if (topics.includes('status')) {
formTopics[`RoomStatusTopic#RoomStatusTopic:${roomId}`] = { "broadcaster_uid": roomId }
}
return form;
}
// @todo this one is somehow bad https://gitea.futureporn.net/futureporn/futureporn-scout/issues/3
// if (topics.includes('title')) {
// formTopics[`RoomTitleChangeTopic#RoomTitleChangeTopic:${roomId}`] = { "broadcaster_uid": roomId }
// }
if (topics.includes('tip')) {
formTopics[`RoomTipAlertTopic#RoomTipAlertTopic:${roomId}`] = { "broacaster_uid": roomId }
}
if (topics.includes('password')) {
formTopics[`RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:${roomId}`] = { "broadcaster_uid": roomId }
}
});
form.append('topics', JSON.stringify(formTopics))
form.append('csrfmiddlewaretoken', csrfToken);
return form;
}
static getTokenCookie(cookies) {
return cookies.find((c) => c.key == 'csrftoken');
}
@ -195,11 +200,11 @@ export default class ChaturbateAuth {
* more info-- https://ably.com/docs/core-features/authentication#token-authentication
*/
async getPushServiceAuth(
roomUrl: string,
form: FormData,
roomUrl: string,
form: FormData,
cookieString: string
) {
// https://faqs.ably.com/40104-timestamp-not-current
// this was caused by signed token request re-use after it had expired.

View File

@ -46,6 +46,37 @@ interface Topics {
[topicId: string]: RoomTopic;
}
export interface IRoomMessage {
name: string;
id: string;
encoding: null;
data: {
tid: string;
ts: number;
_topic: string;
message: string;
font_family: string;
font_color: string;
background: string;
id: string;
from_user: {
username: string;
gender: string;
is_broadcaster: boolean;
in_fanclub: boolean;
is_following: boolean;
is_mod: boolean;
has_tokens: boolean;
tipped_recently: boolean;
tipped_alot_recently: boolean;
tipped_tons_recently: boolean;
};
status?: string;
method: string;
pub_ts: number;
};
}
export default class Room {
private id: string | Promise<string>;
@ -61,7 +92,7 @@ export default class Room {
private realtimeHost: string | null;
private fallbackHosts: string[] | null;
private pushServiceAuth: any;
private onStatus: ((message: any) => void);
private onStatus: ((message: IRoomMessage) => void);
private onMessage: ((message: any) => void);
private onSilence: ((message: any) => void);
private onTitle: ((message: any) => void);
@ -108,6 +139,7 @@ export default class Room {
handleError (err: string) {
this.logger.log({ level: 'debug', message: `handling room error. ${err}`})
this.errors.push(err)
if (/room has a password/.test(err)) this.pubOrPass = PubOrPass.Password
}
@ -202,10 +234,13 @@ export default class Room {
}
static formalizeChannelsRequest(channelsRequest: ChannelsRequest, csrfToken: string): FormData {
let form = new FormData();
form.append('topics', JSON.stringify(channelsRequest))
form.append('csrfmiddlewaretoken', csrfToken);
form.append('backend', 'a')
// form.append('presence_id', ChaturbateAuth.getPresenceId())
return form;
}
@ -223,14 +258,15 @@ export default class Room {
// }
public async getChannelsRequest(): Promise<ChannelsRequest> {
const roomId: string = await this.id;
if (!roomId) throw new Error(`cannot get Room's permissions because a room.id could not be fetched`);
let crForm = {}
crForm[`RoomMessageTopic#RoomMessageTopic:${roomId}`] = { "broadcaster_uid": roomId }
crForm[`RoomSilenceTopic#RoomSilenceTopic:${roomId}`] = { "broadcaster_uid": roomId }
// crForm[`RoomSilenceTopic#RoomSilenceTopic:${roomId}`] = { "broadcaster_uid": roomId }
crForm[`RoomStatusTopic#RoomStatusTopic:${roomId}`] = { "broadcaster_uid": roomId }
crForm[`RoomTitleChangeTopic#RoomTitleChangeTopic:${roomId}`] = { "broadcaster_uid": roomId }
// crForm[`RoomTitleChangeTopic#RoomTitleChangeTopic:${roomId}`] = { "broadcaster_uid": roomId }
crForm[`RoomTipAlertTopic#RoomTipAlertTopic:${roomId}`] = { "broadcaster_uid": roomId }
crForm[`RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:${roomId}`] = { "broadcaster_uid": roomId}
this.channelsRequest = crForm
@ -340,26 +376,58 @@ export default class Room {
return this.channelMap[cbTopic]
}
/*
{
"channels": {
"RoomTipAlertTopic#RoomTipAlertTopic:VVKHB7L": "room:tip_alert:VVKHB7L",
"RoomPurchaseTopic#RoomPurchaseTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomFanClubJoinedTopic#RoomFanClubJoinedTopic:VVKHB7L": "room:fanclub:VVKHB7L",
"RoomMessageTopic#RoomMessageTopic:VVKHB7L": "room:grouped:VVKHB7L:6",
"GlobalPushServiceBackendChangeTopic#GlobalPushServiceBackendChangeTopic": "global:push_service",
"RoomAnonPresenceTopic#RoomAnonPresenceTopic:VVKHB7L": "room_anon:presence:VVKHB7L:6",
"QualityUpdateTopic#QualityUpdateTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomNoticeTopic#RoomNoticeTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:VVKHB7L": "room:grouped:VVKHB7L:6",
"RoomModeratorPromotedTopic#RoomModeratorPromotedTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomModeratorRevokedTopic#RoomModeratorRevokedTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomStatusTopic#RoomStatusTopic:VVKHB7L": "room:grouped:VVKHB7L:6",
"RoomTitleChangeTopic#RoomTitleChangeTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomSilenceTopic#RoomSilenceTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomKickTopic#RoomKickTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomUpdateTopic#RoomUpdateTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomSettingsTopic#RoomSettingsTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L",
"RoomEnterLeaveTopic#RoomEnterLeaveTopic:VVKHB7L": "room:enter_leave:VVKHB7L",
"GameUpdateTopic#GameUpdateTopic:VVKHB7L": "room_unsessioned:grouped:VVKHB7L"
}
}
*/
/**
*
* getChaturbateTopicString
*
* Chaturbate now groups message together. the topics that are grouped are
*
* * status, message, password
* * title, silence
*
* @see https://gitea.futureporn.net/futureporn/futureporn-scout/issues/3
*
* @param genericTopic
* @returns
*/
getChaturbateTopicString(genericTopic: string): string {
let ch: string;
if (genericTopic === 'status') {
if (genericTopic === 'status' || genericTopic === 'message' || genericTopic === 'password') {
ch = `RoomStatusTopic#RoomStatusTopic:${this.id}`
}
else if (genericTopic === 'message') {
ch = `RoomMessageTopic#RoomMessageTopic:${this.id}`
}
else if (genericTopic === 'tip') {
ch = `RoomTipAlertTopic#RoomTipAlertTopic:${this.id}`
}
else if (genericTopic === 'silence') {
ch = `RoomSilenceTopic#RoomSilenceTopic:${this.id}`
}
else if (genericTopic === 'title') {
else if (genericTopic === 'title' || genericTopic === 'silence') {
ch = `RoomTitleChangeTopic#RoomTitleChangeTopic:${this.id}`
}
else if (genericTopic === 'password') {
ch = `RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:${this.id}`
}
return ch
}
@ -370,9 +438,6 @@ export default class Room {
*/
async monitorChannel(channelTopicString: string, eventHandler: Function): Promise<void> {
console.log('lets monitor a channel')
console.log(channelTopicString) // RoomMessageTopic#RoomMessageTopic:BFQTTPV
console.log(this.ablyWrapper)
// assert ably realtime connection
if (this.ablyWrapper.realtimeClient.connection.state !== 'connected') {
@ -383,8 +448,6 @@ export default class Room {
// subscribe to realtime channel
// attach callbacks to subscription handler
const realtimeChannel = this.ablyWrapper.realtimeClient.channels.get(channelTopicString);
console.log('here are the realtime channels')
console.log(this.ablyWrapper.realtimeClient.channels)
realtimeChannel.subscribe((message) => {
eventHandler(message)
@ -404,75 +467,6 @@ export default class Room {
// async monitorRealtime(): Promise<void> {
// if (!this.pushServiceAuth) this.pushServiceAuth = await this.getPushServiceAuth();
// if (!this.id) throw new Error('roomId was missing')
// const roomMessageTopicString = this.pushServiceAuth.channels[`RoomMessageTopic#RoomMessageTopic:${this.id}`]
// const roomSilenceTopicString = this.pushServiceAuth.channels[`RoomSilenceTopic#RoomSilenceTopic:${this.id}`]
// const roomStatusTopicString = this.pushServiceAuth.channels[`RoomStatusTopic#RoomStatusTopic:${this.id}`]
// const roomTitleChangeTopicString = this.pushServiceAuth.channels[`RoomTitleChangeTopic#RoomTitleChangeTopic:${this.id}`]
// const roomTipAlertTopicString = this.pushServiceAuth.channels[`RoomTipAlertTopic#RoomTipAlertTopic:${this.id}`]
// const roomPasswordProtectedString = this.pushServiceAuth.channels[`RoomPasswordProtectedTopic#RoomPasswordProtectedTopic:${this.id}`]
// // We probably don't need to call this
// // once per room. Once per scout is probably enough.
// // we need to refactor for this to work.
// this.realtime = await this.ably.getRealtimeClient()
// this.realtime.connection.once('connected', (idk) => {
// this.logger.log({ level: 'info', message: 'CB Realtime Connected!' })
// })
// const messageChannel = this.realtime.channels.get(roomMessageTopicString);
// messageChannel.subscribe((message) => {
// this.onMessage(message)
// })
// const silenceChannel = this.realtime.channels.get(roomSilenceTopicString);
// silenceChannel.subscribe((message) => {
// this.onSilence(message)
// })
// const statusChannel = this.realtime.channels.get(roomStatusTopicString);
// statusChannel.subscribe((message) => {
// this.logger.log({ level: 'debug', message: `got statusChannel message! ${JSON.stringify(message)}`})
// if (message.data.status === 'public') {
// this.onStart(message)
// } else if (message.data.status === 'offline') {
// this.onStop(message)
// }
// this.logger.log({ level: 'debug', message: `Received room:status:<roomId>:0` })
// this.logger.log({ level: 'debug', message: JSON.stringify(message, null, 2) })
// });
// const titleChannel = this.realtime.channels.get(roomTitleChangeTopicString);
// titleChannel.subscribe((message) => {
// this.onTitle(message)
// })
// const tipAlertChannel = this.realtime.channels.get(roomTipAlertTopicString);
// tipAlertChannel.subscribe((message) => {
// this.logger.log({ level: 'debug', message: `got tipAlertChannel message! ${JSON.stringify(message)}`})
// this.onTip(message)
// })
// const passwordProtectedChannel = this.realtime.channels.get(roomPasswordProtectedString);
// passwordProtectedChannel.subscribe((message) => {
// this.logger.log({ level: 'debug', message: `got passwordProtectedChannel message! ${JSON.stringify(message)}` })
// this.onPassword(message)
// })
// await this.realtime.connect()
// }
// async refreshToken (): Promise<() => void> {
// const authToken = await this.getNewToken()
// }

160
src/Twitter.ts Normal file
View File

@ -0,0 +1,160 @@
import { IAppContext } from "./appContext";
import { isBefore, add, formatISO } from 'date-fns';
import TwitterClient from 'twitter-api-scraper'
import puppeteer from 'puppeteer-extra';
import StealthPlugin from 'puppeteer-extra-plugin-stealth'
export interface ITweet {
id: string;
full_text: string;
}
export interface IScraperQuery {
terms: string;
}
const projektMelodyEpoch = new Date('2020-02-07')
export default class Twitter {
private appContext: IAppContext;
private name: string;
private startDate: Date;
constructor(appContext, opts) {
this.appContext = appContext;
this.name = opts.name;
this.startDate = opts.startDate || projektMelodyEpoch
}
async getAllTweets(): Promise<ITweet[]> {
puppeteer.use(StealthPlugin())
// Launch the browser and open a new blank page
const browser = await puppeteer.launch({ headless: false });
const page = await browser.newPage();
// Navigate the page to a URL
await page.goto(`https://twitter.com/${this.name}`);
// Set screen size
await page.setViewport({width: 1366, height: 768});
// // Type into search box
// await page.type('.search-box__input', 'automate beyond recorder');
// // Wait and click on first result
const searchResultSelector = '.r-1ljd8xs > div:nth-child(1)';
await page.waitForSelector(searchResultSelector);
// await page.click(searchResultSelector);
// // Locate the full title with a unique string
// const textSelector = await page.waitForSelector(
// 'text/Customize and automate'
// );
// const fullTitle = await textSelector?.evaluate(el => el.textContent);
// // Print the full title
// console.log('The title of this blog post is "%s".', fullTitle);
await new Promise(r => setTimeout(r, 8000));
await browser.close();
return []
}
async search(
client: any,
scraperQuery: IScraperQuery,
nextToken: string | null,
startDate: Date,
endDate: Date
): Promise<ITweet[]> {
const later = (delay: number, value: any) => {
// greets stackoverflow, i think
return new Promise(resolve => setTimeout(resolve, delay, value));
};
await later(200, null); // courtesy delay
// const result = await SearchQuery(scraperQuery)
const result = await client.search(scraperQuery, 100, nextToken);
const tweets: ITweet[] = [];
if (result?.tweets) {
for (const tweetId in result.tweets) {
const tweet = result.tweets[tweetId];
console.log(` [🐦] ${tweet.id_str} ${tweet.full_text.substring(0, 70)}`);
tweets.push({ id: tweet.id_str, full_text: tweet.full_text }); // Add other properties as needed
}
}
// get the next page if there is one
if (result?.nextToken) {
const additionalTweets = await this.search(
client,
scraperQuery,
result.nextToken,
startDate,
endDate
);
tweets.push(...additionalTweets);
}
return tweets;
}
/**
*
* query Twitter API for all of an account's tweets
*
* @warning There is a potential future problem with the /2/users/:id/tweets endpoint
* in that pagination only returns the 3200 most recent tweets.
* A potential workaround could be date-based search,
* but this is a future problem as we're only seeing ~2300 tweets as of May 1 2022
*
* see https://developer.twitter.com/en/docs/twitter-api/tweets/timelines/api-reference/get-users-id-tweets
*
*/
async query(client, startDate, endDate): Promise<ITweet[]> {
const scraperQuery = { terms: `from:${this.name} -filter:retweets filter:links since:${formatISO(startDate)} until:${formatISO(endDate)}` };
const results = await this.search(client, scraperQuery, undefined, startDate, endDate)
return results
}
async getAllTweetsOld(): Promise<ITweet[]> {
const client = new TwitterClient['default']();
await client.connect()
let tweets = []
// get all of Mel's tweets
// because there is a limit of 3200 tweets
// using regular pagination method, we instead
// scrape twitter, making several searches using 5 day ranges
// between the date that melody started streaming on CB and now
for await (let [start, end] of this.getDateRange(this.startDate)) {
const queryResults = await this.query(client, start, end);
tweets.concat(queryResults)
}
return tweets
}
/**
* get dates spaced a few days apart
*/
async *getDateRange(startDate: Date): AsyncIterable<[Date, Date]> {
let dateCounter = startDate;
while (isBefore(dateCounter, new Date())) {
const start = dateCounter;
const end = add(dateCounter, { days: 8 });
dateCounter = end;
yield [start, end];
}
}
}

View File

@ -1,6 +1,8 @@
// import { createVod } from './strapi.js'
import Stream from './Stream.js'
import { signalStart, sendSignal } from './faye.js'
import { IRoomMessage } from './Room.js'
import { IAppContext } from './appContext.js'
// import { execa } from 'execa'
export const onCbPassword = async (appContext, roomName, message) => {
@ -113,7 +115,8 @@ export const onCbStop = (appContext, roomName, message) => {
'data_pub_ts': message?.data?.pub_ts || null,
})
sendSignal(appContext, roomName, 'stop')
sendSignal(appContext, { signal: 'stop', room: roomName })
@ -123,16 +126,36 @@ export const onCbStop = (appContext, roomName, message) => {
// appContext.roomTimers[roomName].offline = setTimeout(() => createStreamVOD(appContext, message), 1000*60*5)
}
export const onCbStatus = async (appContext, roomName, message) => {
appContext.logger.log({ level: 'debug', message: `[STREAM STATUS] ${JSON.stringify(message)}` })
if (message.data.status === 'public') {
/**
*
* onCbStatus
*
* CB changed their systems sometime around fall 2023 to where mutiple event types are grouped into a single channel. Due to this, a status callback might actually contain a chat message, so we need to test _topic and see what type of event data we are dealing with.
* @see https://gitea.futureporn.net/futureporn/futureporn-scout/issues/3
*
* @param appContext
* @param roomName
* @param message
*/
export const onCbStatus = async (appContext: IAppContext, roomName: string, message: IRoomMessage) => {
if (message.data._topic === 'RoomMessageTopic') {
onCbMessage(appContext, roomName, message)
} else if (message.data?.status === 'public') {
onCbStart(appContext, roomName, message)
} else if (message.data.status === 'offline') {
} else if (message.data?.status === 'offline') {
onCbStop(appContext, roomName, message)
}
}
export const createStreamVOD = (appContext, message) => {
/**
*
* @todo
* @param appContext
* @param message
*/
export const createStreamVOD = (appContext: IAppContext, message: IRoomMessage) => {
// find the relevant StreamSegments
// group the StreamSegments into a Stream
// create a vod using the start/stop timestamps of the Stream, and the messages falling within those timestamps
@ -140,7 +163,7 @@ export const createStreamVOD = (appContext, message) => {
const stream = new Stream()
stream.getStreamSegmentsUsingEndTs(message.data.ts)
stream.getChat()
stream.uploadToStrapi(appContext)
// stream.uploadToStrapi(appContext)
}
export const onCbTitle = (appContext, roomName, message) => {
@ -199,6 +222,8 @@ export const onCbSilence = (appContext, roomName, message) => {
export const onCbMessage = (appContext, roomName, message) => {
appContext.logger.log({ level: 'debug', message: `[CHAT MESSAGE] ${JSON.stringify(message)}` })
const stmt = appContext.db.prepare(`INSERT INTO messages VALUES (
$_room,
$name,

22
src/metascraper-tryout.js Normal file
View File

@ -0,0 +1,22 @@
import metascraper from 'metascraper';
import metascraperUrl from 'metascraper-url';
import metascraperTitle from 'metascraper-title';
import metascraperImage from 'metascraper-image';
import metascraperDate from 'metascraper-date';
import metascraperDescription from 'metascraper-description';
import metascraperPublisher from 'metascraper-publisher';
const siteUrl = 'https://twitter.com/projektMelody';
const response = await fetch(siteUrl);
const html = await response.text();
const url = response.url;
const metadata = await metascraper([
metascraperUrl(),
metascraperTitle(),
metascraperImage(),
metascraperDate(),
metascraperDescription(),
metascraperPublisher(),
])({ html, url });
console.log(metadata);

86
src/twitter.js.old Normal file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env node
import Twitter from 'twitter-v2';
import { loggerFactory } from './logger.js'
const logger = loggerFactory({
defaultMeta: { service: "futureporn/scout" }
})
const twitterConsumerKey = process.env.TWITTER_API_KEY;
const twitterConsumerSecret = process.env.TWITTER_API_KEY_SECRET;
const projektMelodyTwitterId = '1148121315943075841';
if (typeof twitterConsumerKey === 'undefined')
throw new Error('TWITTER_API_KEY is undefined');
if (typeof twitterConsumerSecret === 'undefined')
throw new Error('TWITTER_API_KEY_SECRET is undefined');
async function delay(timeout) {
logger.log({ level: 'debug', message: ` [*] delaying for ${timeout}ms` });
await new Promise(resolve => setTimeout(resolve, timeout));
}
var client = new Twitter({
consumer_key: twitterConsumerKey,
consumer_secret: twitterConsumerSecret
});
async function setup() {
const ruleBody = {
'add': [
{
'value': 'from:projektmelody -is:retweet',
'tag': 'tweets from melody'
}
]
};
// Delete all rules and add just the ones we want
try {
const { data: rules } = await client.get('tweets/search/stream/rules');
logger.log({ level: 'debug', message: rules });
const ruleIds = rules.map((r) => r.id);
logger.log({ level: 'debug', message: ruleIds });
await client.post('tweets/search/stream/rules', {
'delete': {
"ids": ruleIds
}
});
}
catch (e) {
logger.log({ level: 'error', message: e });
logger.log({ level: 'error', message: 'no big d.' });
}
logger.log({ level: 'debug', message: `creating rule.` });
const ruleRes = await client.post('tweets/search/stream/rules', ruleBody);
logger.log({ level: 'debug', message: `rule created with response ${JSON.stringify(ruleRes)}` });
}
async function listenForever(streamFactory, dataConsumer) {
try {
for await (const { data } of streamFactory()) {
dataConsumer(data);
}
// The stream has been closed by Twitter. It is usually safe to reconnect.
logger.log({ level: 'debug', message: 'Stream disconnected healthily. Reconnecting.' });
listenForever(streamFactory, dataConsumer);
await delay(5000);
}
catch (error) {
// An error occurred so we reconnect to the stream. Note that we should
// probably have retry logic here to prevent reconnection after a number of
// closely timed failures (may indicate a problem that is not downstream).
logger.log({ level: 'warn', message: `Stream disconnected with error. Retrying. ${error}` });
listenForever(streamFactory, dataConsumer);
await delay(5000);
}
}
export default async function twitter(dataConsumer) {
const parameters = {
expansions: [
'author_id'
],
tweet: {
fields: ['created_at', 'entities'],
}
};
await setup();
listenForever(() => client.stream('tweets/search/stream', parameters), dataConsumer);
}

View File

@ -1,16 +1,8 @@
import { expect } from "chai";
import Room from '../../src/Room.ts'
import { describe } from "pm2";
describe('Room', function () {
describe('extractTopicId', function () {
it('should return id', function () {
expect(Room.extractTopicId('RoomStatusTopic#RoomStatusTopic:G0TWFS5')).to.equal('G0TWFS5')
})
it('should return id even when there is :n at the end', function () {
expect(Room.extractTopicId('RoomStatusTopic#RoomStatusTopic:G0TWFS5:3')).to.equal('G0TWFS5')
})
it('should return id even when there is a two digit number at the end', function () {
expect(Room.extractTopicId('RoomStatusTopic#RoomStatusTopic:G0TWFS5:11')).to.equal('G0TWFS5')
})
})
describe('')
})

View File

@ -1,6 +1,6 @@
import { alert } from '../../src/sound.js'
describe('sound', function() {
xdescribe('sound', function() {
it('alert', function() {
alert()
})

View File

@ -9,7 +9,9 @@
"allowImportingTsExtensions": false,
"outDir": "dist",
"lib": [
"es2018", "dom"
"es2018",
"dom",
"dom.iterable"
]
},
}