God Mode Valhalla: Initial Standalone Commit
This commit is contained in:
51
src/lib/queue/BatchProcessor.ts
Normal file
51
src/lib/queue/BatchProcessor.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
interface BatchConfig {
|
||||
batchSize: number; // How many items to grab at once (e.g. 100)
|
||||
concurrency: number; // How many to process in parallel (e.g. 5)
|
||||
delayMs: number; // Throttle speed (e.g. wait 100ms between items)
|
||||
}
|
||||
|
||||
export class BatchProcessor {
|
||||
constructor(private config: BatchConfig) { }
|
||||
|
||||
async processQueue(
|
||||
items: any[],
|
||||
workerFunction: (item: any) => Promise<any>
|
||||
) {
|
||||
const results = [];
|
||||
// Process in Chunks (Batch Size)
|
||||
for (let i = 0; i < items.length; i += this.config.batchSize) {
|
||||
const chunk = items.slice(i, i + this.config.batchSize);
|
||||
console.log(`Processing Batch ${(i / this.config.batchSize) + 1}...`);
|
||||
|
||||
// Within each chunk, limit concurrency
|
||||
const chunkResults = await this.runWithConcurrency(chunk, workerFunction);
|
||||
results.push(...chunkResults);
|
||||
|
||||
// Optional: Cool down between batches
|
||||
if (this.config.delayMs > 0) {
|
||||
await new Promise(r => setTimeout(r, this.config.delayMs));
|
||||
}
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
private async runWithConcurrency(items: any[], fn: (item: any) => Promise<any>) {
|
||||
const results: any[] = [];
|
||||
const executing: Promise<any>[] = [];
|
||||
|
||||
for (const item of items) {
|
||||
const p = Promise.resolve().then(() => fn(item));
|
||||
results.push(p);
|
||||
|
||||
if (this.config.concurrency <= items.length) {
|
||||
const e: Promise<any> = p.then(() => executing.splice(executing.indexOf(e), 1));
|
||||
executing.push(e);
|
||||
|
||||
if (executing.length >= this.config.concurrency) {
|
||||
await Promise.race(executing);
|
||||
}
|
||||
}
|
||||
}
|
||||
return Promise.all(results);
|
||||
}
|
||||
}
|
||||
44
src/lib/queue/config.ts
Normal file
44
src/lib/queue/config.ts
Normal file
@@ -0,0 +1,44 @@
|
||||
/**
|
||||
* BullMQ Configuration
|
||||
* Job queue setup for content generation
|
||||
*/
|
||||
|
||||
import { Queue, Worker, QueueOptions } from 'bullmq';
|
||||
import IORedis from 'ioredis';
|
||||
|
||||
// Redis connection
|
||||
const connection = new IORedis({
|
||||
host: process.env.REDIS_HOST || 'localhost',
|
||||
port: parseInt(process.env.REDIS_PORT || '6379'),
|
||||
maxRetriesPerRequest: null,
|
||||
});
|
||||
|
||||
// Queue options
|
||||
const queueOptions: QueueOptions = {
|
||||
connection,
|
||||
defaultJobOptions: {
|
||||
attempts: 3,
|
||||
backoff: {
|
||||
type: 'exponential',
|
||||
delay: 2000,
|
||||
},
|
||||
removeOnComplete: {
|
||||
count: 100,
|
||||
age: 3600,
|
||||
},
|
||||
removeOnFail: {
|
||||
count: 1000,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
// Define queues
|
||||
export const queues = {
|
||||
generation: new Queue('generation', queueOptions),
|
||||
publishing: new Queue('publishing', queueOptions),
|
||||
svgImages: new Queue('svg-images', queueOptions),
|
||||
wpSync: new Queue('wp-sync', queueOptions),
|
||||
cleanup: new Queue('cleanup', queueOptions),
|
||||
};
|
||||
|
||||
export { connection };
|
||||
Reference in New Issue
Block a user