Files
mini/scripts/start-worker.js

322 lines
10 KiB
JavaScript

#!/usr/bin/env node
/**
* 🔱 GOD MODE WORKER
* ==================
* BullMQ worker for background job processing.
* Connects to Redis for queue management and PostgreSQL for data operations.
*
* Usage:
* npm run worker
*
* Production (PM2):
* pm2 start scripts/start-worker.js --name "god-mode-worker"
*/
import { Worker } from 'bullmq';
import IORedis from 'ioredis';
import pg from 'pg';
// =============================================================================
// 1. CONFIGURATION
// =============================================================================
// Ensure these match your Docker/Environment variables
const REDIS_URL = process.env.REDIS_URL || 'redis://127.0.0.1:6379';
const DATABASE_URL = process.env.DATABASE_URL || 'postgres://postgres:postgres@localhost:5432/spark';
// Queue name must match what you define in your Astro API routes
const QUEUE_NAME = 'god-mode-queue';
// =============================================================================
// 2. DATABASE CONNECTION (Singleton Pool)
// =============================================================================
// We create a pool here so the worker can talk to Postgres directly
const dbPool = new pg.Pool({
connectionString: DATABASE_URL,
max: 5, // Keep connection count low for workers to save DB resources
idleTimeoutMillis: 30000,
});
// =============================================================================
// 3. REDIS CONNECTION
// =============================================================================
// We use a specific connection for the worker to avoid blocking the main app
const redisConnection = new IORedis(REDIS_URL, {
maxRetriesPerRequest: null, // Required by BullMQ - prevents crashes on Redis hiccups
});
console.log(`🔱 [God Mode Worker] Starting up... listening to queue: "${QUEUE_NAME}"`);
// =============================================================================
// 4. THE JOB PROCESSOR
// =============================================================================
// This function runs every time a job enters the queue.
const processJob = async (job) => {
console.log(`[Job ${job.id}] Processing ${job.name}...`);
// Track execution time for monitoring
const start = Date.now();
try {
switch (job.name) {
case 'generate-content':
return await handleContentGeneration(job.data);
case 'generate-report':
return await handleReportGeneration(job.data);
case 'sync-sitemap':
return await handleSitemapSync(job.data);
case 'campaign-blast':
return await handleCampaignBlast(job.data);
case 'refactor-posts':
return await handlePostRefactor(job.data);
default:
throw new Error(`Unknown job name: ${job.name}`);
}
} finally {
const duration = Date.now() - start;
console.log(`[Job ${job.id}] Finished in ${duration}ms`);
// Log to work_log table
try {
await dbPool.query(`
INSERT INTO work_log (action, entity_type, entity_id, details, timestamp)
VALUES ($1, $2, $3, $4, NOW())
`, ['job_complete', job.name, job.id, JSON.stringify({ duration, result: 'success' })]);
} catch (e) {
// Silent fail on logging - don't crash the job
}
}
};
// =============================================================================
// 5. JOB HANDLERS
// =============================================================================
async function handleContentGeneration(data) {
const { jobId, batchSize = 5, mode = 'generate' } = data;
console.log(`[Content Generation] Job ${jobId}, batch size: ${batchSize}, mode: ${mode}`);
// Fetch job details from database
const { rows: jobs } = await dbPool.query(
'SELECT * FROM generation_jobs WHERE id = $1',
[jobId]
);
if (jobs.length === 0) {
throw new Error(`Job ${jobId} not found`);
}
const job = jobs[0];
// Update job status to processing
await dbPool.query(
'UPDATE generation_jobs SET status = $1 WHERE id = $2',
['Processing', jobId]
);
// Process in batches (placeholder for actual generation logic)
const totalToProcess = job.target_quantity || 10;
let processed = job.current_offset || 0;
while (processed < totalToProcess) {
// Simulate batch processing
await new Promise(resolve => setTimeout(resolve, 1000));
processed += batchSize;
// Update progress
await dbPool.query(
'UPDATE generation_jobs SET current_offset = $1 WHERE id = $2',
[Math.min(processed, totalToProcess), jobId]
);
console.log(`[Content Generation] Progress: ${processed}/${totalToProcess}`);
}
// Mark complete
await dbPool.query(
'UPDATE generation_jobs SET status = $1, current_offset = $2 WHERE id = $3',
['Complete', totalToProcess, jobId]
);
return { jobId, processed: totalToProcess, status: 'Complete' };
}
async function handleReportGeneration(data) {
// Fetch data from Postgres
const { rows } = await dbPool.query('SELECT NOW() as now');
// Simulate heavy report generation
await new Promise(resolve => setTimeout(resolve, 2000));
return {
generated: true,
timestamp: rows[0].now,
filePath: `/tmp/report-${Date.now()}.pdf`
};
}
async function handleSitemapSync(data) {
const { domain, siteId } = data;
console.log(`[Sitemap Sync] Processing domain: ${domain}`);
// Fetch pages for the site
const { rows: pages } = await dbPool.query(
'SELECT slug FROM pages WHERE site_id = $1',
[siteId]
);
// Simulate sitemap generation
await new Promise(resolve => setTimeout(resolve, 1000));
return {
domain,
pagesProcessed: pages.length,
sitemapUrl: `https://${domain}/sitemap.xml`
};
}
async function handleCampaignBlast(data) {
const { campaignId, listId } = data;
console.log(`[Campaign Blast] Campaign: ${campaignId}, List: ${listId}`);
// Fetch campaign details
const { rows: campaigns } = await dbPool.query(
'SELECT * FROM campaign_masters WHERE id = $1',
[campaignId]
);
if (campaigns.length === 0) {
throw new Error(`Campaign ${campaignId} not found`);
}
// Simulate sending
await new Promise(resolve => setTimeout(resolve, 3000));
return {
campaignId,
sent: 100,
failed: 0,
status: 'complete'
};
}
async function handlePostRefactor(data) {
const { jobId, siteUrl, authToken } = data;
console.log(`[Post Refactor] Job: ${jobId}, Site: ${siteUrl}`);
// Fetch job config
const { rows: jobs } = await dbPool.query(
'SELECT * FROM generation_jobs WHERE id = $1',
[jobId]
);
if (jobs.length === 0) {
throw new Error(`Job ${jobId} not found`);
}
const job = jobs[0];
const config = job.config || {};
// Update status
await dbPool.query(
'UPDATE generation_jobs SET status = $1 WHERE id = $2',
['Processing', jobId]
);
// Process posts (placeholder)
const totalPosts = config.total_posts || 10;
let processed = 0;
while (processed < totalPosts) {
await new Promise(resolve => setTimeout(resolve, 500));
processed++;
await dbPool.query(
'UPDATE generation_jobs SET current_offset = $1 WHERE id = $2',
[processed, jobId]
);
}
await dbPool.query(
'UPDATE generation_jobs SET status = $1 WHERE id = $2',
['Complete', jobId]
);
return { jobId, processed, status: 'Complete' };
}
// =============================================================================
// 6. WORKER INSTANTIATION
// =============================================================================
const worker = new Worker(QUEUE_NAME, processJob, {
connection: redisConnection,
concurrency: 5, // How many jobs to process in parallel per worker instance
limiter: {
max: 10, // Max 10 jobs
duration: 1000 // per 1 second (Rate limiting)
}
});
// =============================================================================
// 7. EVENT LISTENERS
// =============================================================================
worker.on('completed', (job, returnvalue) => {
console.log(`✅ [Job ${job.id}] Completed! Result:`, returnvalue);
});
worker.on('failed', (job, error) => {
console.error(`❌ [Job ${job.id}] Failed: ${error.message}`);
// Log failed jobs to database
dbPool.query(`
INSERT INTO work_log (action, entity_type, entity_id, details, timestamp)
VALUES ($1, $2, $3, $4, NOW())
`, ['job_failed', job?.name || 'unknown', job?.id || 'unknown', JSON.stringify({ error: error.message })])
.catch(() => { }); // Silent fail
});
worker.on('error', (err) => {
console.error('💀 [Worker] Critical Error:', err);
});
worker.on('ready', () => {
console.log('🔱 [God Mode Worker] Ready and waiting for jobs...');
});
// =============================================================================
// 8. GRACEFUL SHUTDOWN
// =============================================================================
// Essential for Kubernetes/Docker to prevent data corruption on restart
const gracefulShutdown = async (signal) => {
console.log(`\n🛑 [Worker] Received ${signal}. Shutting down gracefully...`);
try {
await worker.close();
console.log(' ✓ Worker closed');
await redisConnection.quit();
console.log(' ✓ Redis disconnected');
await dbPool.end();
console.log(' ✓ Database pool closed');
console.log('👋 [God Mode Worker] Goodbye.');
process.exit(0);
} catch (error) {
console.error('Error during shutdown:', error);
process.exit(1);
}
};
process.on('SIGINT', () => gracefulShutdown('SIGINT'));
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'));
// Unhandled rejection handler
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
});