con un clic
worker-development
// Create and manage background workers, BullMQ queues, Kafka consumers, and scheduled jobs. Use when adding new workers, creating queues, defining job types, building scheduled tasks, or working with async processing.
// Create and manage background workers, BullMQ queues, Kafka consumers, and scheduled jobs. Use when adding new workers, creating queues, defining job types, building scheduled tasks, or working with async processing.
Create and modify integration channels (messenger, whatsapp, zalo, webchat, etc.) for the chatbot platform. Use when adding a new channel integration, modifying webhook handlers, working with message send/receive, or connecting external platforms.
Scaffold new features following project conventions for the builder app. Use when creating a new feature, page, component, server action, query, or adding a new section to the web application.
Create and modify oRPC API routers, procedures, and middleware for the builder app. Use when adding API endpoints, creating routers, defining procedures, working with oRPC middleware, or building OpenAPI routes.
Manage turborepo monorepo development workflow including dev servers, builds, linting, and package management. Use when running dev, build, lint, deploy, or managing workspace packages in this pnpm + turbo monorepo.
Work with Drizzle ORM database schema, migrations, relations, and queries in PostgreSQL. Use when creating tables, modifying schema, writing migrations, defining relations, or querying the database.
| name | worker-development |
| description | Create and manage background workers, BullMQ queues, Kafka consumers, and scheduled jobs. Use when adding new workers, creating queues, defining job types, building scheduled tasks, or working with async processing. |
Workers run as separate Node processes in apps/worker/. They consume jobs from BullMQ queues (Redis-backed) or Kafka topics.
Shared config lives in packages/worker-config/ (@chatbotx.io/worker-config).
| Worker | Queue/Topic | Entry |
|---|---|---|
| integration | integration | src/integration/worker.ts |
| chat | chat | src/chat/worker.ts |
| ai-agent | aiAgent | src/ai-agent/worker.ts |
| default | default | src/default/worker.ts |
| trigger | trigger | src/trigger/worker.ts |
| webhook | webhook | src/webhook/worker.ts |
| schedule | (cron) | src/schedule/worker.ts |
| sequence-scheduler | Kafka | src/sequence-scheduler/worker*.ts |
Add to packages/worker-config/src/lib/types.ts:
export const queueName = {
// ...existing
myQueue: "myQueue",
} as const
Create packages/worker-config/src/queues/<name>/index.ts:
import { Queue } from "bullmq"
import { getRedisConnection, defaultJobOptions, fakeQueue } from "../../lib/connection"
export enum MyQueueJobAction {
processItem = "processItem",
syncData = "syncData",
}
type ProcessItemJob = {
type: MyQueueJobAction.processItem
data: { itemId: string; workspaceId: string }
}
type SyncDataJob = {
type: MyQueueJobAction.syncData
data: { source: string }
}
export type MyQueueJobData = ProcessItemJob | SyncDataJob
const NEXT_PHASE = process.env.NEXT_PHASE
export const myQueue =
NEXT_PHASE === "phase-production-build"
? fakeQueue
: new Queue(queueNames.enum.myQueue, {
connection: getRedisConnection(),
defaultJobOptions,
})
Add to packages/worker-config/src/index.ts:
export * from "./queues/<name>"
Create apps/worker/src/<domain>/worker.ts:
import { Worker, type Job } from "bullmq"
import { queueNames } from "@chatbotx.io/worker-config"
import {
getRedisConnection,
defaultWorkerOptions,
} from "@chatbotx.io/worker-config"
import type { MyQueueJobData } from "@chatbotx.io/worker-config"
import { MyQueueJobAction } from "@chatbotx.io/worker-config"
import { ensureBootstrapped } from "../lib/bootstrap"
import { logger } from "@chatbotx.io/logger"
const startMyWorker = async () => {
try {
await ensureBootstrapped()
logger.info("My worker bootstrapped successfully")
} catch (err) {
logger.error(err, "Failed to bootstrap my worker")
process.exit(1)
}
const worker = new Worker(
queueNames.enum.myQueue,
async (job: Job<MyQueueJobData>) => {
logger.info(job.data, "Worker received job")
switch (job.data.type) {
case MyQueueJobAction.processItem:
return await handleProcessItem(job.data.data)
case MyQueueJobAction.syncData:
return await handleSyncData(job.data.data)
default:
return
}
},
{
connection: getRedisConnection(),
...defaultWorkerOptions,
},
)
worker.on("failed", (job, err) => {
logger.error({ err, jobId: job?.id }, "Job failed")
})
worker.on("completed", (job) => {
logger.info({ jobId: job.id }, "Job completed")
})
}
startMyWorker()
Add to apps/worker/tsdown.config.ts:
entry: [
// ...existing
"src/<domain>/worker.ts",
]
Add to apps/worker/package.json:
{
"scripts": {
"worker:<domain>": "dotenv -e ../../.env -- tsx --watch src/<domain>/worker.ts"
}
}
Include in the dev script's concurrently list if needed.
From builder or other apps:
import { myQueue, MyQueueJobAction } from "@chatbotx.io/worker-config"
await myQueue.add("processItem", {
type: MyQueueJobAction.processItem,
data: { itemId: "123", workspaceId: "456" },
})
// Bulk enqueue
await myQueue.addBulk([
{
name: "syncData",
data: { type: MyQueueJobAction.syncData, data: { source: "api" } },
},
])
The schedule worker (src/schedule/worker.ts) runs periodic tasks. Add cron-style scheduled work there or use BullMQ's repeatable jobs:
await myQueue.add(
"syncData",
{ type: MyQueueJobAction.syncData, data: { source: "cron" } },
{ repeat: { pattern: "0 */6 * * *" } }, // every 6 hours
)
For high-throughput scenarios, the project uses Kafka:
createProducer from @chatbotx.io/kafkacreateConsumer from @chatbotx.io/kafkaOnly used for sequence dispatch currently. Prefer BullMQ for standard job queues.
| What | Import from |
|---|---|
| Queue names, job types | @chatbotx.io/worker-config |
| Redis connection, options | @chatbotx.io/worker-config |
| Database | @chatbotx.io/database/client |
| Integration handlers | ../services/integrations (within worker) |
| Logger | @chatbotx.io/logger |
| SDK types | @chatbotx.io/sdk |
Import the child logger from ../../lib/logger inside apps/worker, or @chatbotx.io/logger for shared packages.
Always use err (not error) as the key for Error objects.
pino's built-in serializer is keyed on err. Using any other key (e.g. error) skips serialization, so the stack trace and error message are lost from structured logs.
// ✅ correct — stack trace preserved
logger.error({ err: error, conversationId }, "Failed to emit analytics event")
// ❌ wrong — object serialized as [Object] with no stack trace
logger.error({ error, conversationId }, "Failed to emit analytics event")
For fire-and-forget .catch() handlers on analytics/event-bus emissions, always log the error rather than swallowing it:
emit("analytics:dashboard", payload).catch((err) => {
logger.error({ err, conversationId }, "[handler] Failed to emit analytics event")
})
dotenv -e ../../.env (root .env file)packages/worker-config/src/keys.tsfakeQueue stubs are used during Next.js production build to avoid Redis requirement