#!/usr/bin/env node /** * šŸ”± GOD MODE WORKER * ================== * BullMQ worker for background job processing. * Connects to Redis for queue management and PostgreSQL for data operations. * * Usage: * npm run worker * * Production (PM2): * pm2 start scripts/start-worker.js --name "god-mode-worker" */ import { Worker } from 'bullmq'; import IORedis from 'ioredis'; import pg from 'pg'; // ============================================================================= // 1. CONFIGURATION // ============================================================================= // Ensure these match your Docker/Environment variables const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379'; const DATABASE_URL = process.env.DATABASE_URL || 'postgres://postgres:postgres@localhost:5432/spark'; // Queue name must match what you define in your Astro API routes const QUEUE_NAME = 'god-mode-queue'; // ============================================================================= // 2. DATABASE CONNECTION (Singleton Pool) // ============================================================================= // We create a pool here so the worker can talk to Postgres directly const dbPool = new pg.Pool({ connectionString: DATABASE_URL, max: 5, // Keep connection count low for workers to save DB resources idleTimeoutMillis: 30000, }); // ============================================================================= // 3. REDIS CONNECTION // ============================================================================= // We use a specific connection for the worker to avoid blocking the main app const redisConnection = new IORedis(REDIS_URL, { maxRetriesPerRequest: null, // Required by BullMQ - prevents crashes on Redis hiccups }); console.log(`šŸ”± [God Mode Worker] Starting up... listening to queue: "${QUEUE_NAME}"`); // ============================================================================= // 4. THE JOB PROCESSOR // ============================================================================= // This function runs every time a job enters the queue. const processJob = async (job) => { console.log(`[Job ${job.id}] Processing ${job.name}...`); // Track execution time for monitoring const start = Date.now(); try { switch (job.name) { case 'generate-content': return await handleContentGeneration(job.data); case 'generate-report': return await handleReportGeneration(job.data); case 'sync-sitemap': return await handleSitemapSync(job.data); case 'campaign-blast': return await handleCampaignBlast(job.data); case 'refactor-posts': return await handlePostRefactor(job.data); default: throw new Error(`Unknown job name: ${job.name}`); } } finally { const duration = Date.now() - start; console.log(`[Job ${job.id}] Finished in ${duration}ms`); // Log to work_log table try { await dbPool.query(` INSERT INTO work_log (action, entity_type, entity_id, details, timestamp) VALUES ($1, $2, $3, $4, NOW()) `, ['job_complete', job.name, job.id, JSON.stringify({ duration, result: 'success' })]); } catch (e) { // Silent fail on logging - don't crash the job } } }; // ============================================================================= // 5. JOB HANDLERS // ============================================================================= async function handleContentGeneration(data) { const { jobId, batchSize = 5, mode = 'generate' } = data; console.log(`[Content Generation] Job ${jobId}, batch size: ${batchSize}, mode: ${mode}`); // Fetch job details from database const { rows: jobs } = await dbPool.query( 'SELECT * FROM generation_jobs WHERE id = $1', [jobId] ); if (jobs.length === 0) { throw new Error(`Job ${jobId} not found`); } const job = jobs[0]; // Update job status to processing await dbPool.query( 'UPDATE generation_jobs SET status = $1 WHERE id = $2', ['Processing', jobId] ); // Process in batches (placeholder for actual generation logic) const totalToProcess = job.target_quantity || 10; let processed = job.current_offset || 0; while (processed < totalToProcess) { // Simulate batch processing await new Promise(resolve => setTimeout(resolve, 1000)); processed += batchSize; // Update progress await dbPool.query( 'UPDATE generation_jobs SET current_offset = $1 WHERE id = $2', [Math.min(processed, totalToProcess), jobId] ); console.log(`[Content Generation] Progress: ${processed}/${totalToProcess}`); } // Mark complete await dbPool.query( 'UPDATE generation_jobs SET status = $1, current_offset = $2 WHERE id = $3', ['Complete', totalToProcess, jobId] ); return { jobId, processed: totalToProcess, status: 'Complete' }; } async function handleReportGeneration(data) { // Fetch data from Postgres const { rows } = await dbPool.query('SELECT NOW() as now'); // Simulate heavy report generation await new Promise(resolve => setTimeout(resolve, 2000)); return { generated: true, timestamp: rows[0].now, filePath: `/tmp/report-${Date.now()}.pdf` }; } async function handleSitemapSync(data) { const { domain, siteId } = data; console.log(`[Sitemap Sync] Processing domain: ${domain}`); // Fetch pages for the site const { rows: pages } = await dbPool.query( 'SELECT slug FROM pages WHERE site_id = $1', [siteId] ); // Simulate sitemap generation await new Promise(resolve => setTimeout(resolve, 1000)); return { domain, pagesProcessed: pages.length, sitemapUrl: `https://${domain}/sitemap.xml` }; } async function handleCampaignBlast(data) { const { campaignId, listId } = data; console.log(`[Campaign Blast] Campaign: ${campaignId}, List: ${listId}`); // Fetch campaign details const { rows: campaigns } = await dbPool.query( 'SELECT * FROM campaign_masters WHERE id = $1', [campaignId] ); if (campaigns.length === 0) { throw new Error(`Campaign ${campaignId} not found`); } // Simulate sending await new Promise(resolve => setTimeout(resolve, 3000)); return { campaignId, sent: 100, failed: 0, status: 'complete' }; } async function handlePostRefactor(data) { const { jobId, siteUrl, authToken } = data; console.log(`[Post Refactor] Job: ${jobId}, Site: ${siteUrl}`); // Fetch job config const { rows: jobs } = await dbPool.query( 'SELECT * FROM generation_jobs WHERE id = $1', [jobId] ); if (jobs.length === 0) { throw new Error(`Job ${jobId} not found`); } const job = jobs[0]; const config = job.config || {}; // Update status await dbPool.query( 'UPDATE generation_jobs SET status = $1 WHERE id = $2', ['Processing', jobId] ); // Process posts (placeholder) const totalPosts = config.total_posts || 10; let processed = 0; while (processed < totalPosts) { await new Promise(resolve => setTimeout(resolve, 500)); processed++; await dbPool.query( 'UPDATE generation_jobs SET current_offset = $1 WHERE id = $2', [processed, jobId] ); } await dbPool.query( 'UPDATE generation_jobs SET status = $1 WHERE id = $2', ['Complete', jobId] ); return { jobId, processed, status: 'Complete' }; } // ============================================================================= // 6. WORKER INSTANTIATION // ============================================================================= const worker = new Worker(QUEUE_NAME, processJob, { connection: redisConnection, concurrency: 5, // How many jobs to process in parallel per worker instance limiter: { max: 10, // Max 10 jobs duration: 1000 // per 1 second (Rate limiting) } }); // ============================================================================= // 7. EVENT LISTENERS // ============================================================================= worker.on('completed', (job, returnvalue) => { console.log(`āœ… [Job ${job.id}] Completed! Result:`, returnvalue); }); worker.on('failed', (job, error) => { console.error(`āŒ [Job ${job.id}] Failed: ${error.message}`); // Log failed jobs to database dbPool.query(` INSERT INTO work_log (action, entity_type, entity_id, details, timestamp) VALUES ($1, $2, $3, $4, NOW()) `, ['job_failed', job?.name || 'unknown', job?.id || 'unknown', JSON.stringify({ error: error.message })]) .catch(() => { }); // Silent fail }); worker.on('error', (err) => { console.error('šŸ’€ [Worker] Critical Error:', err); }); worker.on('ready', () => { console.log('šŸ”± [God Mode Worker] Ready and waiting for jobs...'); }); // ============================================================================= // 8. GRACEFUL SHUTDOWN // ============================================================================= // Essential for Kubernetes/Docker to prevent data corruption on restart const gracefulShutdown = async (signal) => { console.log(`\nšŸ›‘ [Worker] Received ${signal}. Shutting down gracefully...`); try { await worker.close(); console.log(' āœ“ Worker closed'); await redisConnection.quit(); console.log(' āœ“ Redis disconnected'); await dbPool.end(); console.log(' āœ“ Database pool closed'); console.log('šŸ‘‹ [God Mode Worker] Goodbye.'); process.exit(0); } catch (error) { console.error('Error during shutdown:', error); process.exit(1); } }; process.on('SIGINT', () => gracefulShutdown('SIGINT')); process.on('SIGTERM', () => gracefulShutdown('SIGTERM')); // Unhandled rejection handler process.on('unhandledRejection', (reason, promise) => { console.error('Unhandled Rejection at:', promise, 'reason:', reason); });