import { createCache } from 'cache-manager'; import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; import { createWriteStream, existsSync, rmSync } from 'fs'; import { mkdirp } from 'fs-extra'; import { join } from 'path'; import { readdir, unlink } from 'fs/promises'; import { env } from '../config/env'; import Keyv from 'keyv'; import KeyvPostgres from "@keyv/postgres" import logger from './logger'; const keyv = new Keyv(new KeyvPostgres({ uri: env.DATABASE_URL, schema: 'keyv' })); keyv.on('error', (err) => { logger.error('keyv error encountered.') logger.error(err) }); const cache = createCache({ stores: [keyv] }); const LOCK_TTL = 1000 * 60 * 3; // 3 minutes lock timeout const RETRY_DELAY = 100; // 100ms between retries const MAX_RETRIES = 10; // Max retries to acquire lock export async function getOrDownloadAsset(client: S3Client, bucket: string, key: string): Promise { logger.debug(`getOrDownloadAsset with bucket=${bucket} key=${key}`) if (!client) throw new Error('getOrDownloadAsset requires S3Client as first argument'); if (!bucket) throw new Error('getOrDownloadAsset requires bucket as second argument'); if (!key) throw new Error('getOrDownloadAsset requires key as third argument'); const safeKey = key.replace(/[^a-zA-Z0-9_.-]/g, '_'); const dir = join(env.CACHE_ROOT, bucket); const path = join(dir, safeKey); const cacheKey = `${bucket}:${key}`; const lockKey = `${cacheKey}:lock`; // 1. Check cache first (non-blocking) const cachedPath = await cache.get(cacheKey); if (cachedPath && existsSync(cachedPath)) { return cachedPath; } // 2. Ensure directory exists await mkdirp(dir); // 3. Acquire distributed lock let acquiredLock = false; let retryCount = 0; while (retryCount < MAX_RETRIES && !acquiredLock) { // Attempt to set lock with TTL if not exists const result = await cache.wrap(lockKey, async () => true, { ttl: LOCK_TTL, // Convert to seconds isCacheable: () => true, // Force caching even for boolean }).catch(() => false); // If wrap fails, lock exists if (result === true) { acquiredLock = true; } else { retryCount++; await new Promise(resolve => setTimeout(resolve, RETRY_DELAY)); } } if (!acquiredLock) { throw new Error(`Failed to acquire lock for ${cacheKey} after ${MAX_RETRIES} attempts`); } try { // 4. Double check cache after acquiring lock const cachedPathAfterLock = await cache.get(cacheKey); if (cachedPathAfterLock && existsSync(cachedPathAfterLock)) { return cachedPathAfterLock; } // 5. Download the file const command = new GetObjectCommand({ Bucket: bucket, Key: key }); const response = await client.send(command); await new Promise((resolve, reject) => { const stream = response.Body as NodeJS.ReadableStream; const fileStream = createWriteStream(path); stream.pipe(fileStream); stream.on('end', resolve); stream.on('error', (err) => { fileStream.destroy(err); reject(err); }); }); // 6. Cache the file path await cache.set(cacheKey, path, 72 * 60 * 60); // 72 hours in seconds return path; } finally { // 7. Release the lock await cache.del(lockKey).catch(() => { /* Ignore errors */ }); } } export async function cleanExpiredFiles(): Promise { let deletedCount = 0; const bucket = env.S3_BUCKET const cacheDir = env.CACHE_ROOT const files = await readdir(cacheDir); for (const file of files) { const key = file.replace(cacheDir, ''); const cacheKey = `${bucket}:${key}`; const fullPath = join(cacheDir, file); logger.debug(`file=${file} key=${key} cacheKey=${cacheKey}`) if (file === bucket) continue; const stillCached = await cache.get(cacheKey); if (!stillCached) { try { rmSync(fullPath, { recursive: true, force: true }); deletedCount++; logger.debug(`Deleted expired file: ${fullPath}`); } catch (err) { logger.warn(`Failed to delete file ${fullPath}:`, err); } } } return deletedCount; }