This repository has been archived on 2023-12-05. You can view files and clone it, but cannot push or open issues or pull requests.
futureporn-qa/worker.ts

81 lines
2.7 KiB
TypeScript

import * as path from 'path';
import * as dotenv from 'dotenv'
import pkg from './package.json' assert {type: 'json'};
import { loggerFactory } from './src/logger.js';
import { type Logger } from 'winston';
import { dirname } from 'path';
import { fileURLToPath } from 'url';
import { loadTaskDefinitions } from './src/Task.js';
import { Worker, Job } from 'bullmq'
import { default as Redis } from 'ioredis'
import { REDIS_HOST, REDIS_PORT, TASK_LIST, IPFS_CLUSTER_HTTP_API_MULTIADDR } from './src/env.js';
const connection = new Redis({
port: parseInt(REDIS_PORT),
host: REDIS_HOST,
maxRetriesPerRequest: null,
});
const __dirname = dirname(fileURLToPath(import.meta.url));
dotenv.config();
const taskList = TASK_LIST.split(',');
const logger = loggerFactory({ defaultMeta: { service: 'futureporn-qa' }});
logger.log({ level: 'info', message: `🧃 futureporn-qa worker version ${pkg.version}` });
logger.log({ level: 'info', message: `📃 worker for the following tasks:${taskList}` });
async function main(logger: Logger, taskList: string[]) {
const tasksDirectory = path.join(__dirname, 'src', 'tasks'); // Update the path as needed
const activeTasks = await loadTaskDefinitions(logger, tasksDirectory, taskList);
// Print the list of active tasks
logger.log({ level: 'info', message: `👷 Active Tasks: ${activeTasks.map((task) => task.name)}` });
for (const task of activeTasks) {
logger.log({ level: 'info', message: `🈺 Initializing queue for ${task.name}`});
logger.log({ level: 'info', message: `💪 Initializing worker for ${task.name}` });
const worker = new Worker(
task.name,
async (job: Job) => {
logger.log({ level: 'info', message: `🏃 Worker is doing the thing ${JSON.stringify(job.data)} ${task.name}` })
const res = await task.runTask({
env: process.env,
logger: logger,
job: job,
connection: connection,
})
logger.log({ level: 'info', message: `🍻 Worker completed task ${task.name} (job ID ${job.id})` })
return res;
}, {
concurrency: 1,
autorun: false,
connection,
}
);
worker.on('error', (err) => {
logger.log({ level: 'error', message: `🔥 Worker ${worker.id} encountered an error` })
if (err instanceof Error) {
logger.log({ level: 'error', message: `🔥 ${err.message}` })
}
})
worker.on('failed', (job: Job<any, any, string> | undefined) => {
if (!job) logger.log({ level: 'error', message: `job failed. job was undefined.`});
logger.log({ level: 'warn', message: `💔 worker failed on job ${job}` });
})
worker.run();
}
}
main(logger, taskList);