252 lines
7.7 KiB
TypeScript
252 lines
7.7 KiB
TypeScript
// BullMQ Worker: Content Generator
|
|
import { Worker } from 'bullmq';
|
|
import { pool } from '../lib/db';
|
|
import { redisConnection } from '../lib/queue/config';
|
|
import { SpintaxResolver, expandVariables, generateCartesianProduct } from '../lib/spintax/resolver';
|
|
import crypto from 'crypto';
|
|
|
|
interface GenerateCampaignJob {
|
|
campaignId: string;
|
|
campaignName: string;
|
|
}
|
|
|
|
/**
|
|
* Main content generation worker
|
|
* Processes campaigns and generates full articles
|
|
*/
|
|
export const contentGeneratorWorker = new Worker(
|
|
'generate_campaign_content',
|
|
async (job) => {
|
|
const { campaignId, campaignName } = job.data as GenerateCampaignJob;
|
|
|
|
console.log(`\n🔱 Processing campaign: ${campaignName} (${campaignId})`);
|
|
|
|
try {
|
|
// 1. Fetch campaign blueprint
|
|
const campaignResult = await pool.query(
|
|
`SELECT id, blueprint_json, site_id FROM campaign_masters WHERE id = $1`,
|
|
[campaignId]
|
|
);
|
|
|
|
if (campaignResult.rows.length === 0) {
|
|
throw new Error(`Campaign ${campaignId} not found`);
|
|
}
|
|
|
|
const campaign = campaignResult.rows[0];
|
|
const blueprint = campaign.blueprint_json;
|
|
const siteId = campaign.site_id;
|
|
|
|
console.log(`📋 Blueprint loaded: ${blueprint.asset_name}`);
|
|
console.log(`🔢 Variables: ${Object.keys(blueprint.variables).join(', ')}`);
|
|
|
|
// 2. Generate cartesian product of all variables
|
|
const combinations = generateCartesianProduct(blueprint.variables);
|
|
console.log(`🎯 Generated ${combinations.length} unique combinations`);
|
|
|
|
let created = 0;
|
|
let skipped = 0;
|
|
|
|
// 3. For each combination, generate article
|
|
for (const vars of combinations) {
|
|
try {
|
|
const articleHtml = await generateArticle(blueprint, vars, campaignId, siteId);
|
|
|
|
if (articleHtml) {
|
|
created++;
|
|
console.log(`✅ Created: ${vars.CITY}, ${vars.STATE} (${created}/${combinations.length})`);
|
|
} else {
|
|
skipped++;
|
|
}
|
|
} catch (err) {
|
|
console.error(`❌ Failed for ${vars.CITY}:`, err);
|
|
skipped++;
|
|
}
|
|
|
|
// Update progress
|
|
await job.updateProgress((created + skipped) / combinations.length * 100);
|
|
}
|
|
|
|
// 4. Mark campaign complete
|
|
await pool.query(
|
|
`UPDATE campaign_masters SET status = 'completed', updated_at = NOW() WHERE id = $1`,
|
|
[campaignId]
|
|
);
|
|
|
|
console.log(`\n🎉 Campaign complete! Created: ${created}, Skipped: ${skipped}\n`);
|
|
|
|
return { created, skipped, total: combinations.length };
|
|
|
|
} catch (error) {
|
|
console.error(`💥 Campaign processing failed:`, error);
|
|
|
|
await pool.query(
|
|
`UPDATE campaign_masters SET status = 'failed', updated_at = NOW() WHERE id = $1`,
|
|
[campaignId]
|
|
);
|
|
|
|
throw error;
|
|
}
|
|
},
|
|
{
|
|
connection: redisConnection,
|
|
concurrency: 2
|
|
}
|
|
);
|
|
|
|
/**
|
|
* Generate a single article from blueprint + variables
|
|
*/
|
|
async function generateArticle(
|
|
blueprint: any,
|
|
vars: Record<string, string>,
|
|
campaignId: string,
|
|
siteId: string
|
|
): Promise<string | null> {
|
|
|
|
// Create unique seed for this variation
|
|
const seed = JSON.stringify(vars);
|
|
const resolver = new SpintaxResolver(seed);
|
|
|
|
// 1. Expand variable placeholders
|
|
let fullHtml = '';
|
|
const blocks = blueprint.content.body;
|
|
|
|
for (const block of blocks) {
|
|
let blockContent = block.content;
|
|
|
|
// Replace {{VARIABLES}}
|
|
blockContent = expandVariables(blockContent, vars);
|
|
|
|
// Resolve spintax {A|B|C}
|
|
blockContent = resolver.resolve(blockContent);
|
|
|
|
fullHtml += blockContent + '\n\n';
|
|
}
|
|
|
|
// 2. Generate variation hash for uniqueness check
|
|
const variationHash = resolver.getChoicesHash();
|
|
const varsHash = crypto.createHash('sha256')
|
|
.update(JSON.stringify(vars))
|
|
.digest('hex')
|
|
.substring(0, 32);
|
|
const combinedHash = crypto.createHash('sha256')
|
|
.update(variationHash + varsHash)
|
|
.digest('hex')
|
|
.substring(0, 64);
|
|
|
|
// 3. Check if this variation already exists
|
|
const existingCheck = await pool.query(
|
|
`SELECT id FROM variation_registry WHERE variation_hash = $1`,
|
|
[combinedHash]
|
|
);
|
|
|
|
if (existingCheck.rows.length > 0) {
|
|
console.log(`⏭️ Skipping duplicate: ${vars.CITY}`);
|
|
return null;
|
|
}
|
|
|
|
// 4. Generate title and slug
|
|
const title = expandVariables(resolver.resolve(blueprint.asset_name), vars);
|
|
const slug = generateSlug(vars);
|
|
|
|
// 5. Generate meta description
|
|
const metaDesc = expandVariables(
|
|
resolver.resolve(blueprint.meta_description),
|
|
vars
|
|
);
|
|
|
|
// 6. Insert post
|
|
const postResult = await pool.query(
|
|
`INSERT INTO posts (
|
|
site_id, title, slug, content, excerpt, status,
|
|
published_at, created_at
|
|
) VALUES ($1, $2, $3, $4, $5, 'published', NOW(), NOW())
|
|
RETURNING id`,
|
|
[
|
|
siteId,
|
|
title,
|
|
slug,
|
|
fullHtml,
|
|
metaDesc.substring(0, 300)
|
|
]
|
|
);
|
|
|
|
const postId = postResult.rows[0].id;
|
|
|
|
// 7. Record the variation
|
|
await pool.query(
|
|
`INSERT INTO variation_registry (
|
|
campaign_id, variation_hash, resolved_variables, spintax_choices, post_id
|
|
) VALUES ($1, $2, $3, $4, $5)`,
|
|
[
|
|
campaignId,
|
|
combinedHash,
|
|
JSON.stringify(vars),
|
|
JSON.stringify(resolver.getChoices()),
|
|
postId
|
|
]
|
|
);
|
|
|
|
// 8. Update block usage stats
|
|
await updateBlockUsageStats(blocks, resolver.getChoices());
|
|
|
|
return fullHtml;
|
|
}
|
|
|
|
/**
|
|
* Update usage statistics for blocks and spintax choices
|
|
*/
|
|
async function updateBlockUsageStats(blocks: any[], choices: any[]) {
|
|
// Track which blocks were used
|
|
for (const block of blocks) {
|
|
await pool.query(
|
|
`INSERT INTO block_usage_stats (content_fragment_id, block_type, total_uses, last_used_at)
|
|
SELECT id, $1, 1, NOW()
|
|
FROM content_fragments
|
|
WHERE block_type = $1
|
|
ON CONFLICT (content_fragment_id) DO UPDATE SET
|
|
total_uses = block_usage_stats.total_uses + 1,
|
|
last_used_at = NOW()`,
|
|
[block.block_type.replace(/ \(\d+\)$/, '')]
|
|
);
|
|
}
|
|
|
|
// Track spintax variation usage
|
|
for (const choice of choices) {
|
|
await pool.query(
|
|
`INSERT INTO spintax_variation_stats (
|
|
content_fragment_id, variation_path, variation_text, use_count, last_used_at
|
|
)
|
|
SELECT cf.id, $1, $2, 1, NOW()
|
|
FROM content_fragments cf
|
|
LIMIT 1
|
|
ON CONFLICT DO NOTHING`,
|
|
[choice.path, choice.chosen]
|
|
);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Generate URL-safe slug from variables
|
|
*/
|
|
function generateSlug(vars: Record<string, string>): string {
|
|
const city = (vars.CITY || 'city').toLowerCase().replace(/[^a-z0-9]+/g, '-');
|
|
const state = (vars.STATE || 'state').toLowerCase().replace(/[^a-z0-9]+/g, '-');
|
|
return `${city}-${state}-${Date.now()}`;
|
|
}
|
|
|
|
// Worker event handlers
|
|
contentGeneratorWorker.on('completed', (job) => {
|
|
console.log(`✅ Job ${job.id} completed:`, job.returnvalue);
|
|
});
|
|
|
|
contentGeneratorWorker.on('failed', (job, err) => {
|
|
console.error(`❌ Job ${job?.id} failed:`, err);
|
|
});
|
|
|
|
contentGeneratorWorker.on('error', (err) => {
|
|
console.error('💥 Worker error:', err);
|
|
});
|
|
|
|
console.log('🚀 Content Generator Worker started');
|