// src/tasks/consolidate_twitch_channel_rewards.ts import type { Task, Helpers } from "graphile-worker"; import { PrismaClient, User, type Pick } from '../../generated/prisma'; import { withAccelerate } from "@prisma/extension-accelerate"; import { env } from "../config/env"; import { constants } from "../config/constants"; import { getRateLimiter } from "../utils/rateLimiter"; const prisma = new PrismaClient().$extends(withAccelerate()); const cprPath = env.TWITCH_MOCK ? constants.twitch.dev.paths.channelPointRewards : constants.twitch.prod.paths.channelPointRewards; interface Payload { userId: number; } export interface TwitchChannelPointReward { id: string; broadcaster_id: string; cost: number; title: string; is_in_stock: boolean; [key: string]: any; } const getAuthToken = (user: User) => env.TWITCH_MOCK ? env.TWITCH_MOCK_USER_ACCESS_TOKEN : user.twitchToken?.accessToken; function assertPayload(payload: any): asserts payload is Payload { if (typeof payload !== "object" || !payload) throw new Error("invalid payload"); if (typeof payload.userId !== "number") throw new Error("invalid payload.userId"); } const getTwitchChannelPointRewards = async (user: User) => { if (!user?.twitchToken) throw new Error("Missing Twitch token"); const authToken = getAuthToken(user); const limiter = getRateLimiter(); await limiter.consume('twitch', 1); const query = new URLSearchParams({ broadcaster_id: user.twitchId }); const res = await fetch(`${env.TWITCH_API_ORIGIN}${cprPath}?${query}`, { headers: { 'Authorization': `Bearer ${authToken}`, 'Client-Id': env.TWITCH_CLIENT_ID } }); if (!res.ok) throw new Error(`Failed to fetch rewards: ${res.statusText}`); return res.json(); }; const createTwitchReward = async (user: User, pick: Pick) => { const authToken = getAuthToken(user); const limiter = getRateLimiter(); await limiter.consume('twitch', 1); console.log('pick as follows') console.log(pick) console.log(`pick?.waifu?.name=${pick?.waifu?.name}`) const query = new URLSearchParams({ broadcaster_id: user.twitchId }); const res = await fetch(`${env.TWITCH_API_ORIGIN}${cprPath}?${query}`, { method: 'POST', headers: { 'Authorization': `Bearer ${authToken}`, 'Client-Id': env.TWITCH_CLIENT_ID }, body: JSON.stringify({ cost: user.redeemCost, title: pick.waifu.name }) }); if (!res.ok) throw new Error(`Failed to create reward: ${res.statusText}`); const data = await res.json(); const rewardId = data.data?.[0]?.id; if (!rewardId) throw new Error("No reward ID returned"); await prisma.pick.update({ where: { id: pick.id }, data: { twitchChannelPointRewardId: rewardId } }); }; const deleteTwitchReward = async (user: User, rewardId: string) => { const authToken = getAuthToken(user); const limiter = getRateLimiter(); await limiter.consume('twitch', 1); const query = new URLSearchParams({ broadcaster_id: user.twitchId, id: rewardId }); const res = await fetch(`${env.TWITCH_API_ORIGIN}${cprPath}?${query}`, { method: 'DELETE', headers: { 'Authorization': `Bearer ${authToken}`, 'Client-Id': env.TWITCH_CLIENT_ID } }); if (!res.ok) throw new Error(`Failed to delete reward ${rewardId}`); }; const updateTwitchReward = async (user: User, rewardId: string, newCost: number) => { const authToken = getAuthToken(user); const limiter = getRateLimiter(); await limiter.consume('twitch', 1); const query = new URLSearchParams({ broadcaster_id: user.twitchId, id: rewardId }); const res = await fetch(`${env.TWITCH_API_ORIGIN}${cprPath}?${query}`, { method: 'PATCH', headers: { 'Authorization': `Bearer ${authToken}`, 'Client-Id': env.TWITCH_CLIENT_ID }, body: JSON.stringify({ cost: newCost }) }); if (!res.ok) throw new Error(`Failed to update reward ${rewardId}`); }; const consolidateTwitchRewards = async (userId: number) => { const user = await prisma.user.findFirstOrThrow({ where: { id: userId }, include: { twitchToken: true, Waifu: true } }); // Fetch picks (most recent N) const picks = await prisma.pick.findMany({ where: { userId }, orderBy: { createdAt: 'desc' }, take: constants.twitch.maxChannelPointRewards, include: { waifu: true } }); // Ensure every pick has a reward before processing Twitch side for (const pick of picks) { if (!pick.twitchChannelPointRewardId) { console.log(`Creating new reward for pick: ${pick.id}`); await createTwitchReward(user, pick); } } // Refresh picks after reward creation const updatedPicks = await prisma.pick.findMany({ where: { userId }, orderBy: { createdAt: 'desc' }, take: constants.twitch.maxChannelPointRewards }); // Get the most recent N reward IDs const currentPickIds = new Set( updatedPicks.slice(0, user.waifuChoicePoolSize).map(p => p.twitchChannelPointRewardId) ); console.log('currentPickIds as follows'); console.log(currentPickIds); // Fetch Twitch-side rewards const twitchData = await getTwitchChannelPointRewards(user); const twitchRewards: TwitchChannelPointReward[] = twitchData.data; // Delete or update Twitch rewards not in current pick set for (const reward of twitchRewards) { if (!updatedPicks.some(p => p.twitchChannelPointRewardId === reward.id)) continue; if (!currentPickIds.has(reward.id)) { console.log(`Deleting out-of-date reward: ${reward.id}`); await deleteTwitchReward(user, reward.id); } else if (reward.cost !== user.redeemCost) { console.log(`Updating reward cost for: ${reward.id}`); await updateTwitchReward(user, reward.id, user.redeemCost); } } }; const task: Task = async (payload: any, helpers) => { assertPayload(payload); await consolidateTwitchRewards(payload.userId); }; export default task;