import { type NotificationData } from '@futureporn/types' import { type Helpers } from 'graphile-worker' import { type IStreamResponse, type IStreamsResponse, type IPlatformNotificationResponse, type IVtubersResponse, type IVtuberResponse, type IVtuber, } from '@futureporn/types' import { subMinutes, addMinutes } from 'date-fns' import qs from 'qs' import { getProminentColor } from '@futureporn/image' import { getImage } from '@futureporn/scout/vtuber.js' import { fpSlugify } from '@futureporn/utils' import { uploadFile } from '@futureporn/storage/s3.js' export async function upsertStream({ date, vtuberId, platform, pNotifId }: { date: string, vtuberId: number, platform: string, pNotifId: number }, helpers: Helpers): Promise { if (!date) throw new Error(`upsertStream requires date in the arg object, but it was undefined`); if (!vtuberId) throw new Error(`upsertStream requires vtuberId in the arg object, but it was undefined`); if (!platform) throw new Error(`upsertStream requires platform in the arg object, but it was undefined`); if (!pNotifId) throw new Error(`upsertStream requires pNotifId in the arg object, but it was undefined`); let streamId // # Step 3. // Finally we find or create the stream record // The stream may already be in the db (the streamer is multi-platform streaming), so we look for that record. // This gets a bit tricky. How do we determine one stream from another? // For now, the rule is 30 minutes of separation. // Anything <=30m is interpreted as the same stream. Anything >30m is interpreted as a different stream. // If the stream is not in the db, we create the stream record const dateSinceRange = subMinutes(new Date(date), 30) const dateUntilRange = addMinutes(new Date(date), 30) helpers.logger.info(`Find a stream within + or - 30 mins of the notif date=${new Date(date).toISOString()}. dateSinceRange=${dateSinceRange.toISOString()}, dateUntilRange=${dateUntilRange.toISOString()}`) const findStreamQueryString = qs.stringify({ populate: 'platform-notifications', filters: { date: { $gte: dateSinceRange, $lte: dateUntilRange }, vtuber: { id: { '$eq': vtuberId } } } }, { encode: false }) helpers.logger.info('>> findStream') const findStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams?${findStreamQueryString}`, { method: 'GET', headers: { 'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`, 'Content-Type': 'application/json' } }) const findStreamData = await findStreamRes.json() as IStreamsResponse if (findStreamData?.data && findStreamData.data.length > 0) { helpers.logger.info('>> we found a findStreamData json. (there is an existing stream for this e-mail/notification)') helpers.logger.info(JSON.stringify(findStreamData, null, 2)) const stream = findStreamData.data[0] if (!stream) throw new Error('stream was undefined'); streamId = stream.id // Before we're done here, we need to do something extra. We need to populate isChaturbateStream and/or isFanslyStream. // We know which of these booleans to set based on the stream's related platformNotifications // We go through each pNotif and look at it's platform let isFanslyStream = false let isChaturbateStream = false if (stream.attributes.platformNotifications) { for (const pn of stream.attributes.platformNotifications) { if (pn.attributes.platform === 'fansly') { isFanslyStream = true } else if (pn.attributes.platform === 'chaturbate') { isChaturbateStream = true } } } helpers.logger.info(`>>> updating stream ${streamId}. isFanslyStream=${isFanslyStream}, isChaturbateStream=${isChaturbateStream}`) const updateStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams/${streamId}`, { method: 'PUT', headers: { 'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`, 'content-type': 'application/json' }, body: JSON.stringify({ data: { isFanslyStream: isFanslyStream, isChaturbateStream: isChaturbateStream, platformNotifications: [ pNotifId ] } }) }) const updateStreamJson = await updateStreamRes.json() as IStreamResponse if (updateStreamJson?.error) throw new Error(JSON.stringify(updateStreamJson, null, 2)); helpers.logger.info(`>> assuming a successful update to the stream record. response as follows.`) helpers.logger.info(JSON.stringify(updateStreamJson, null, 2)) } if (!streamId) { helpers.logger.info('>> did not find a streamId, so we go ahead and create a stream record in the db.') const createStreamPayload = { data: { isFanslyStream: (platform === 'fansly') ? true : false, isChaturbateStream: (platform === 'chaturbate') ? true : false, archiveStatus: 'missing', date: date, date2: date, date_str: date, vtuber: vtuberId, platformNotifications: [ pNotifId ] } } helpers.logger.debug('>> createStreamPayload as follows') helpers.logger.debug(JSON.stringify(createStreamPayload, null, 2)) const createStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams`, { method: 'POST', headers: { 'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`, "Content-Type": "application/json" }, body: JSON.stringify(createStreamPayload) }) const createStreamJson = await createStreamRes.json() as IStreamResponse helpers.logger.debug('>> we got the createStreamJson') helpers.logger.debug(JSON.stringify(createStreamJson, null, 2)) if (createStreamJson.error) { console.error(JSON.stringify(createStreamJson.error, null, 2)) throw new Error('Failed to create stream in DB due to an error. (see above)') } streamId = createStreamJson.data.id } if (!streamId) throw new Error('failed to get streamId') return streamId } export async function upsertPlatformNotification({ source, date, platform, vtuberId }: { source: string, date: string, platform: string, vtuberId: number }, helpers: Helpers): Promise { helpers.logger.info('hello from upsertPlatformNotification', { source, date, platform, vtuberId }); if (!source) throw new Error(`upsertPlatformNotification requires source arg, but it was undefined`); if (!date) throw new Error(`upsertPlatformNotification requires date arg, but it was undefined`); if (!platform) throw new Error(`upsertPlatformNotification requires platform arg, but it was undefined`); if (!vtuberId) throw new Error(`upsertPlatformNotification requires vtuberId arg, but it was undefined`); let pNotifId // # Step 2. // Next we create the platform-notification record. // This probably doesn't already exist, so we don't check for a pre-existing platform-notification. const pNotifPayload = { data: { source: source, date: date, date2: date, platform: platform, vtuber: vtuberId, } } helpers.logger.debug('pNotifPayload as follows') helpers.logger.debug(JSON.stringify(pNotifPayload, null, 2)) const pNotifCreateRes = await fetch(`${process.env.STRAPI_URL}/api/platform-notifications`, { method: 'POST', headers: { 'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`, 'Content-Type': 'application/json' }, body: JSON.stringify(pNotifPayload) }) const pNotifData = await pNotifCreateRes.json() as IPlatformNotificationResponse if (pNotifData.error) { helpers.logger.error('>> we failed to create platform-notification, there was an error in the response') helpers.logger.error(JSON.stringify(pNotifData.error, null, 2)) throw new Error(pNotifData.error) } helpers.logger.debug(`>> pNotifData (json response) is as follows`) helpers.logger.debug(JSON.stringify(pNotifData, null, 2)) if (!pNotifData.data?.id) throw new Error('failed to created pNotifData! The response was missing an id'); pNotifId = pNotifData.data.id if (!pNotifId) throw new Error('failed to get Platform Notification ID'); return pNotifId } export async function upsertVtuber({ platform, userId, url, channel }: { platform: string, userId: string | null, url: string, channel: string }, helpers: Helpers): Promise { let vtuberId helpers.logger.debug('>> # Step 1, upsertVtuber') // # Step 1. // First we find or create the vtuber // The vtuber may already be in the db, so we look for that record. All we need is the Vtuber ID. // If the vtuber is not in the db, we create the vtuber record. // GET /api/:pluralApiId?filters[field][operator]=value const findVtubersFilters = (() => { if (platform === 'chaturbate') { return { chaturbate: { $eq: url } } } else if (platform === 'fansly') { if (!userId) throw new Error('Fansly userId was undefined, but it is required.') return { fanslyId: { $eq: userId } } } })() helpers.logger.debug('>>>>> the following is findVtubersFilters.') helpers.logger.debug(JSON.stringify(findVtubersFilters, null, 2)) const findVtubersQueryString = qs.stringify({ filters: findVtubersFilters }, { encode: false }) helpers.logger.debug(`>>>>> platform=${platform}, url=${url}, userId=${userId}`) helpers.logger.debug('>> findVtuber') const findVtuberRes = await fetch(`${process.env.STRAPI_URL}/api/vtubers?${findVtubersQueryString}`, { method: 'GET', headers: { 'content-type': 'application/json' } }) const findVtuberJson = await findVtuberRes.json() as IVtubersResponse helpers.logger.debug('>> here is the vtuber json') helpers.logger.debug(JSON.stringify(findVtuberJson, null, 2)) if (findVtuberJson?.data && findVtuberJson.data.length > 0) { helpers.logger.debug('>> a vtuber was FOUND') if (findVtuberJson.data.length > 1) throw new Error('There were more than one vtuber matches in the response. There must only be one.'); const vtuber = findVtuberJson.data[0] if (!vtuber) throw new Error('vtuber did not have an id. vtuber must have an id.') helpers.logger.debug('here is the findVtuberJson (as follows)') helpers.logger.debug(JSON.stringify(findVtuberJson, null, 2)) helpers.logger.debug(`the matching vtuber has ID=${vtuber.id} (${vtuber.attributes.displayName})`) } if (!vtuberId) { helpers.logger.info('>> vtuberId was not found so we create') /** * We are creating a vtuber record. * We need a few things. * * image URL * * themeColor * * To get an image, we have to do a few things. * * [x] download image from platform * * [x] get themeColor from image * * [x] upload image to b2 * * [x] get B2 cdn link to image * * To get themeColor, we need the image locally where we can then run */ // download image from platform // vtuber.getImage expects a vtuber object, which we don't have yet, so we create a dummy one const dummyVtuber: IVtuber = { id: 69, attributes: { slug: fpSlugify(channel), displayName: 'example', vods: [], description1: ' ', image: ' ', themeColor: ' ', fanslyId: (platform === 'fansly') ? (userId ? userId : undefined) : undefined } } const imageFile = await getImage(dummyVtuber) // get themeColor from image const themeColor = await getProminentColor(imageFile) // upload image to b2 const b2FileData = await uploadFile(imageFile) // get b2 cdn link to image const imageCdnLink = `${process.env.CDN_BUCKET_URL}/${b2FileData.Key}` helpers.logger.info(`>>> createVtuberRes here we go 3-2-1, POST!`) const createVtuberRes = await fetch(`${process.env.STRAPI_URL}/api/vtubers`, { method: 'POST', headers: { 'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`, 'content-type': 'application/json' }, body: JSON.stringify({ data: { displayName: channel, fansly: (platform === 'fansly') ? url : null, fanslyId: (platform === 'fansly') ? userId : null, chaturbate: (platform === 'chaturbate') ? url : null, slug: fpSlugify(channel), description1: ' ', image: imageCdnLink, themeColor: themeColor || '#dde1ec' } }) }) const createVtuberJson = await createVtuberRes.json() as IVtuberResponse helpers.logger.info('>> createVtuberJson as follows') helpers.logger.info(JSON.stringify(createVtuberJson, null, 2)) if (createVtuberJson.data) { vtuberId = createVtuberJson.data.id helpers.logger.info(`>>> vtuber created with id=${vtuberId}`) } } if (!vtuberId) throw new Error(`upsertVtuber failed to produce a vtuberId! This should not happen under normal circumstances.`); return vtuberId } export default async function (payload: NotificationData, helpers: Helpers) { const source = 'email' const { url, platform, channel, displayName, date, userId, avatar } = payload helpers.logger.info(`process_notif_email task execution has begun with date=${date}, channel=${channel}, platform=${platform}, url=${url}, displayName=${displayName}, avatar=${avatar}`); const vtuberId = await upsertVtuber({channel, platform, url, userId }, helpers); const pNotifId = await upsertPlatformNotification({date, platform, source, vtuberId}, helpers); const streamId = await upsertStream({date, platform, pNotifId, vtuberId}, helpers); return `vtuberId: ${vtuberId} | pNotifId: ${pNotifId} | streamId: ${streamId}`; }