335 lines
14 KiB
TypeScript
335 lines
14 KiB
TypeScript
|
|
||
|
import { type NotificationData } from '@futureporn/types'
|
||
|
import { type Helpers } from 'graphile-worker'
|
||
|
import {
|
||
|
type IStreamResponse,
|
||
|
type IStreamsResponse,
|
||
|
type IPlatformNotificationResponse,
|
||
|
type IVtubersResponse,
|
||
|
type IVtuberResponse,
|
||
|
type IVtuber,
|
||
|
} from '@futureporn/types'
|
||
|
import { subMinutes, addMinutes } from 'date-fns'
|
||
|
import qs from 'qs'
|
||
|
import { getProminentColor } from '@futureporn/image'
|
||
|
import { getImage } from '@futureporn/scout/vtuber.js'
|
||
|
import { fpSlugify } from '@futureporn/utils'
|
||
|
import { uploadFile } from '@futureporn/storage/s3.js'
|
||
|
|
||
|
export async function upsertStream({
|
||
|
date,
|
||
|
vtuberId,
|
||
|
platform,
|
||
|
pNotifId
|
||
|
}: {
|
||
|
date: string,
|
||
|
vtuberId: number,
|
||
|
platform: string,
|
||
|
pNotifId: number
|
||
|
}, helpers: Helpers): Promise<number> {
|
||
|
|
||
|
if (!date) throw new Error(`upsertStream requires date in the arg object, but it was undefined`);
|
||
|
if (!vtuberId) throw new Error(`upsertStream requires vtuberId in the arg object, but it was undefined`);
|
||
|
if (!platform) throw new Error(`upsertStream requires platform in the arg object, but it was undefined`);
|
||
|
if (!pNotifId) throw new Error(`upsertStream requires pNotifId in the arg object, but it was undefined`);
|
||
|
|
||
|
let streamId
|
||
|
// # Step 3.
|
||
|
// Finally we find or create the stream record
|
||
|
// The stream may already be in the db (the streamer is multi-platform streaming), so we look for that record.
|
||
|
// This gets a bit tricky. How do we determine one stream from another?
|
||
|
// For now, the rule is 30 minutes of separation.
|
||
|
// Anything <=30m is interpreted as the same stream. Anything >30m is interpreted as a different stream.
|
||
|
// If the stream is not in the db, we create the stream record
|
||
|
const dateSinceRange = subMinutes(new Date(date), 30)
|
||
|
const dateUntilRange = addMinutes(new Date(date), 30)
|
||
|
helpers.logger.info(`Find a stream within + or - 30 mins of the notif date=${new Date(date).toISOString()}. dateSinceRange=${dateSinceRange.toISOString()}, dateUntilRange=${dateUntilRange.toISOString()}`)
|
||
|
const findStreamQueryString = qs.stringify({
|
||
|
populate: 'platform-notifications',
|
||
|
filters: {
|
||
|
date: {
|
||
|
$gte: dateSinceRange,
|
||
|
$lte: dateUntilRange
|
||
|
},
|
||
|
vtuber: {
|
||
|
id: {
|
||
|
'$eq': vtuberId
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}, { encode: false })
|
||
|
|
||
|
helpers.logger.info('>> findStream')
|
||
|
const findStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams?${findStreamQueryString}`, {
|
||
|
method: 'GET',
|
||
|
headers: {
|
||
|
'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`,
|
||
|
'Content-Type': 'application/json'
|
||
|
}
|
||
|
})
|
||
|
const findStreamData = await findStreamRes.json() as IStreamsResponse
|
||
|
if (findStreamData?.data && findStreamData.data.length > 0) {
|
||
|
helpers.logger.info('>> we found a findStreamData json. (there is an existing stream for this e-mail/notification)')
|
||
|
helpers.logger.info(JSON.stringify(findStreamData, null, 2))
|
||
|
const stream = findStreamData.data[0]
|
||
|
if (!stream) throw new Error('stream was undefined');
|
||
|
streamId = stream.id
|
||
|
|
||
|
// Before we're done here, we need to do something extra. We need to populate isChaturbateStream and/or isFanslyStream.
|
||
|
// We know which of these booleans to set based on the stream's related platformNotifications
|
||
|
// We go through each pNotif and look at it's platform
|
||
|
let isFanslyStream = false
|
||
|
let isChaturbateStream = false
|
||
|
if (stream.attributes.platformNotifications) {
|
||
|
for (const pn of stream.attributes.platformNotifications) {
|
||
|
if (pn.attributes.platform === 'fansly') {
|
||
|
isFanslyStream = true
|
||
|
} else if (pn.attributes.platform === 'chaturbate') {
|
||
|
isChaturbateStream = true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
helpers.logger.info(`>>> updating stream ${streamId}. isFanslyStream=${isFanslyStream}, isChaturbateStream=${isChaturbateStream}`)
|
||
|
const updateStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams/${streamId}`, {
|
||
|
method: 'PUT',
|
||
|
headers: {
|
||
|
'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`,
|
||
|
'content-type': 'application/json'
|
||
|
},
|
||
|
body: JSON.stringify({
|
||
|
data: {
|
||
|
isFanslyStream: isFanslyStream,
|
||
|
isChaturbateStream: isChaturbateStream,
|
||
|
platformNotifications: [
|
||
|
pNotifId
|
||
|
]
|
||
|
}
|
||
|
})
|
||
|
})
|
||
|
const updateStreamJson = await updateStreamRes.json() as IStreamResponse
|
||
|
if (updateStreamJson?.error) throw new Error(JSON.stringify(updateStreamJson, null, 2));
|
||
|
helpers.logger.info(`>> assuming a successful update to the stream record. response as follows.`)
|
||
|
helpers.logger.info(JSON.stringify(updateStreamJson, null, 2))
|
||
|
}
|
||
|
|
||
|
if (!streamId) {
|
||
|
helpers.logger.info('>> did not find a streamId, so we go ahead and create a stream record in the db.')
|
||
|
const createStreamPayload = {
|
||
|
data: {
|
||
|
isFanslyStream: (platform === 'fansly') ? true : false,
|
||
|
isChaturbateStream: (platform === 'chaturbate') ? true : false,
|
||
|
archiveStatus: 'missing',
|
||
|
date: date,
|
||
|
date2: date,
|
||
|
date_str: date,
|
||
|
vtuber: vtuberId,
|
||
|
platformNotifications: [
|
||
|
pNotifId
|
||
|
]
|
||
|
}
|
||
|
}
|
||
|
helpers.logger.debug('>> createStreamPayload as follows')
|
||
|
helpers.logger.debug(JSON.stringify(createStreamPayload, null, 2))
|
||
|
const createStreamRes = await fetch(`${process.env.STRAPI_URL}/api/streams`, {
|
||
|
method: 'POST',
|
||
|
headers: {
|
||
|
'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`,
|
||
|
"Content-Type": "application/json"
|
||
|
},
|
||
|
body: JSON.stringify(createStreamPayload)
|
||
|
})
|
||
|
const createStreamJson = await createStreamRes.json() as IStreamResponse
|
||
|
helpers.logger.debug('>> we got the createStreamJson')
|
||
|
helpers.logger.debug(JSON.stringify(createStreamJson, null, 2))
|
||
|
if (createStreamJson.error) {
|
||
|
console.error(JSON.stringify(createStreamJson.error, null, 2))
|
||
|
throw new Error('Failed to create stream in DB due to an error. (see above)')
|
||
|
}
|
||
|
streamId = createStreamJson.data.id
|
||
|
}
|
||
|
|
||
|
if (!streamId) throw new Error('failed to get streamId')
|
||
|
return streamId
|
||
|
}
|
||
|
|
||
|
|
||
|
export async function upsertPlatformNotification({ source, date, platform, vtuberId }: { source: string, date: string, platform: string, vtuberId: number }, helpers: Helpers): Promise<number> {
|
||
|
helpers.logger.info('hello from upsertPlatformNotification', { source, date, platform, vtuberId });
|
||
|
|
||
|
if (!source) throw new Error(`upsertPlatformNotification requires source arg, but it was undefined`);
|
||
|
if (!date) throw new Error(`upsertPlatformNotification requires date arg, but it was undefined`);
|
||
|
if (!platform) throw new Error(`upsertPlatformNotification requires platform arg, but it was undefined`);
|
||
|
if (!vtuberId) throw new Error(`upsertPlatformNotification requires vtuberId arg, but it was undefined`);
|
||
|
|
||
|
let pNotifId
|
||
|
// # Step 2.
|
||
|
// Next we create the platform-notification record.
|
||
|
// This probably doesn't already exist, so we don't check for a pre-existing platform-notification.
|
||
|
const pNotifPayload = {
|
||
|
data: {
|
||
|
source: source,
|
||
|
date: date,
|
||
|
date2: date,
|
||
|
platform: platform,
|
||
|
vtuber: vtuberId,
|
||
|
}
|
||
|
}
|
||
|
helpers.logger.debug('pNotifPayload as follows')
|
||
|
helpers.logger.debug(JSON.stringify(pNotifPayload, null, 2))
|
||
|
|
||
|
const pNotifCreateRes = await fetch(`${process.env.STRAPI_URL}/api/platform-notifications`, {
|
||
|
method: 'POST',
|
||
|
headers: {
|
||
|
'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`,
|
||
|
'Content-Type': 'application/json'
|
||
|
},
|
||
|
body: JSON.stringify(pNotifPayload)
|
||
|
})
|
||
|
const pNotifData = await pNotifCreateRes.json() as IPlatformNotificationResponse
|
||
|
if (pNotifData.error) {
|
||
|
helpers.logger.error('>> we failed to create platform-notification, there was an error in the response')
|
||
|
helpers.logger.error(JSON.stringify(pNotifData.error, null, 2))
|
||
|
throw new Error(pNotifData.error)
|
||
|
}
|
||
|
helpers.logger.debug(`>> pNotifData (json response) is as follows`)
|
||
|
helpers.logger.debug(JSON.stringify(pNotifData, null, 2))
|
||
|
if (!pNotifData.data?.id) throw new Error('failed to created pNotifData! The response was missing an id');
|
||
|
|
||
|
pNotifId = pNotifData.data.id
|
||
|
if (!pNotifId) throw new Error('failed to get Platform Notification ID');
|
||
|
return pNotifId
|
||
|
|
||
|
}
|
||
|
export async function upsertVtuber({ platform, userId, url, channel }: { platform: string, userId: string | null, url: string, channel: string }, helpers: Helpers): Promise<number> {
|
||
|
|
||
|
let vtuberId
|
||
|
helpers.logger.debug('>> # Step 1, upsertVtuber')
|
||
|
// # Step 1.
|
||
|
// First we find or create the vtuber
|
||
|
// The vtuber may already be in the db, so we look for that record. All we need is the Vtuber ID.
|
||
|
// If the vtuber is not in the db, we create the vtuber record.
|
||
|
|
||
|
// GET /api/:pluralApiId?filters[field][operator]=value
|
||
|
const findVtubersFilters = (() => {
|
||
|
if (platform === 'chaturbate') {
|
||
|
return { chaturbate: { $eq: url } }
|
||
|
} else if (platform === 'fansly') {
|
||
|
if (!userId) throw new Error('Fansly userId was undefined, but it is required.')
|
||
|
return { fanslyId: { $eq: userId } }
|
||
|
}
|
||
|
})()
|
||
|
helpers.logger.debug('>>>>> the following is findVtubersFilters.')
|
||
|
helpers.logger.debug(JSON.stringify(findVtubersFilters, null, 2))
|
||
|
|
||
|
const findVtubersQueryString = qs.stringify({
|
||
|
filters: findVtubersFilters
|
||
|
}, { encode: false })
|
||
|
helpers.logger.debug(`>>>>> platform=${platform}, url=${url}, userId=${userId}`)
|
||
|
|
||
|
helpers.logger.debug('>> findVtuber')
|
||
|
const findVtuberRes = await fetch(`${process.env.STRAPI_URL}/api/vtubers?${findVtubersQueryString}`, {
|
||
|
method: 'GET',
|
||
|
headers: {
|
||
|
'content-type': 'application/json'
|
||
|
}
|
||
|
})
|
||
|
const findVtuberJson = await findVtuberRes.json() as IVtubersResponse
|
||
|
helpers.logger.debug('>> here is the vtuber json')
|
||
|
helpers.logger.debug(JSON.stringify(findVtuberJson, null, 2))
|
||
|
if (findVtuberJson?.data && findVtuberJson.data.length > 0) {
|
||
|
helpers.logger.debug('>> a vtuber was FOUND')
|
||
|
if (findVtuberJson.data.length > 1) throw new Error('There were more than one vtuber matches in the response. There must only be one.');
|
||
|
const vtuber = findVtuberJson.data[0]
|
||
|
if (!vtuber) throw new Error('vtuber did not have an id. vtuber must have an id.')
|
||
|
helpers.logger.debug('here is the findVtuberJson (as follows)')
|
||
|
helpers.logger.debug(JSON.stringify(findVtuberJson, null, 2))
|
||
|
helpers.logger.debug(`the matching vtuber has ID=${vtuber.id} (${vtuber.attributes.displayName})`)
|
||
|
}
|
||
|
|
||
|
if (!vtuberId) {
|
||
|
helpers.logger.info('>> vtuberId was not found so we create')
|
||
|
|
||
|
/**
|
||
|
* We are creating a vtuber record.
|
||
|
* We need a few things.
|
||
|
* * image URL
|
||
|
* * themeColor
|
||
|
*
|
||
|
* To get an image, we have to do a few things.
|
||
|
* * [x] download image from platform
|
||
|
* * [x] get themeColor from image
|
||
|
* * [x] upload image to b2
|
||
|
* * [x] get B2 cdn link to image
|
||
|
*
|
||
|
* To get themeColor, we need the image locally where we can then run
|
||
|
*/
|
||
|
|
||
|
// download image from platform
|
||
|
// vtuber.getImage expects a vtuber object, which we don't have yet, so we create a dummy one
|
||
|
const dummyVtuber: IVtuber = {
|
||
|
id: 69,
|
||
|
attributes: {
|
||
|
slug: fpSlugify(channel),
|
||
|
displayName: 'example',
|
||
|
vods: [],
|
||
|
description1: ' ',
|
||
|
image: ' ',
|
||
|
themeColor: ' ',
|
||
|
fanslyId: (platform === 'fansly') ? (userId ? userId : undefined) : undefined
|
||
|
}
|
||
|
}
|
||
|
const imageFile = await getImage(dummyVtuber)
|
||
|
|
||
|
// get themeColor from image
|
||
|
const themeColor = await getProminentColor(imageFile)
|
||
|
|
||
|
// upload image to b2
|
||
|
const b2FileData = await uploadFile(imageFile)
|
||
|
|
||
|
// get b2 cdn link to image
|
||
|
const imageCdnLink = `${process.env.CDN_BUCKET_URL}/${b2FileData.Key}`
|
||
|
|
||
|
helpers.logger.info(`>>> createVtuberRes here we go 3-2-1, POST!`)
|
||
|
const createVtuberRes = await fetch(`${process.env.STRAPI_URL}/api/vtubers`, {
|
||
|
method: 'POST',
|
||
|
headers: {
|
||
|
'authorization': `Bearer ${process.env.SCOUT_STRAPI_API_KEY}`,
|
||
|
'content-type': 'application/json'
|
||
|
},
|
||
|
body: JSON.stringify({
|
||
|
data: {
|
||
|
displayName: channel,
|
||
|
fansly: (platform === 'fansly') ? url : null,
|
||
|
fanslyId: (platform === 'fansly') ? userId : null,
|
||
|
chaturbate: (platform === 'chaturbate') ? url : null,
|
||
|
slug: fpSlugify(channel),
|
||
|
description1: ' ',
|
||
|
image: imageCdnLink,
|
||
|
themeColor: themeColor || '#dde1ec'
|
||
|
}
|
||
|
})
|
||
|
})
|
||
|
const createVtuberJson = await createVtuberRes.json() as IVtuberResponse
|
||
|
helpers.logger.info('>> createVtuberJson as follows')
|
||
|
helpers.logger.info(JSON.stringify(createVtuberJson, null, 2))
|
||
|
if (createVtuberJson.data) {
|
||
|
vtuberId = createVtuberJson.data.id
|
||
|
helpers.logger.info(`>>> vtuber created with id=${vtuberId}`)
|
||
|
}
|
||
|
}
|
||
|
if (!vtuberId) throw new Error(`upsertVtuber failed to produce a vtuberId! This should not happen under normal circumstances.`);
|
||
|
return vtuberId
|
||
|
}
|
||
|
|
||
|
export default async function (payload: NotificationData, helpers: Helpers) {
|
||
|
const source = 'email'
|
||
|
const { url, platform, channel, displayName, date, userId, avatar } = payload
|
||
|
helpers.logger.info(`process_notif_email task execution has begun with date=${date}, channel=${channel}, platform=${platform}, url=${url}, displayName=${displayName}, avatar=${avatar}`);
|
||
|
const vtuberId = await upsertVtuber({channel, platform, url, userId }, helpers);
|
||
|
const pNotifId = await upsertPlatformNotification({date, platform, source, vtuberId}, helpers);
|
||
|
const streamId = await upsertStream({date, platform, pNotifId, vtuberId}, helpers);
|
||
|
return `vtuberId: ${vtuberId} | pNotifId: ${pNotifId} | streamId: ${streamId}`;
|
||
|
}
|