with one click
with one click
Effect v4 模式指南。当需要创建 Effect 服务、定义错误类型、编写 Effect 程序、管理 Layer 组合、或使用 Effect 封装异步操作时使用
创建或修改一个新的 API tier。当用户请求“新增 tier / 创建 partner tier / 新增 merchant 端 / 新增 tenant 端 / 新增 API 端 / 新增一套路由层”时使用。目标是在不修改框架核心的前提下,为新 tier 补齐配置、中间件、类型别名、路由入口和测试。
创建或修改 CRUD 模块。当需要创建新的增删改查 API、修改现有路由模块、添加新字段、新增接口、或用户请求"创建/修改 XX 管理"时使用
创建或修改数据库 Schema。当需要创建新表、修改表结构、定义字段、设置索引约束、或涉及 Drizzle ORM / drizzle-zod 操作时使用
Drizzle ORM v1 关系查询指南。当需要定义 Relations v2、编写关系查询、使用 through 多对多、预定义过滤器、或从旧版 Drizzle 迁移时使用
| name | bullmq |
| description | 创建或修改 BullMQ 队列任务。当需要创建新队列、添加任务类型、注册 Worker、设置定时任务、或用户请求"添加后台任务/队列处理"时使用 |
| argument-hint | [queue-name/job-name] |
maxRetriesPerRequest: null)src/lib/
├── enums/bullmq.ts # 队列和任务名称枚举
├── infrastructure/
│ ├── bullmq/
│ │ └── job-registry.ts # 类型映射和 Zod 验证
│ ├── bullmq-adapter.ts # QueueManager 核心类
│ └── effect/services/bullmq.ts # Effect Layer
│ └── bootstrap.ts # Worker 注册位置
src/routes/admin/
└── queue-board.index.ts # Bull Board UI 路由
QueueJobsMapping 确保队列只能使用特定 job nameJobDefinitionRegistry 自动推断 job data 类型JobSchemaRegistry 使用 Zod 验证数据email、cleanup,不用 EMAIL_QUEUE)send-welcome、daily-cleanup)as const(不用 enum)[JobName.XXX]: Schema 形式2024-01-01T00:00:00Z)z.uuid() 验证Effect.tryPromise)Effect.syncError 类型Effect.sync(立即返回 Worker 实例)src/lib/enums/bullmq.ts 添加队列名称job-registry.ts 创建 QueueJobsMapping 映射(初始为 never)src/lib/enums/bullmq.ts 添加任务名称job-registry.ts 创建 Zod schemaJobDefinitionRegistry 类型映射JobSchemaRegistry 验证映射QueueJobsMapping 关联队列queueManager.registerWorkerJob<T> 类型)在业务代码中使用(无需创建文件):
import { Effect } from "effect";
import { queueManager } from "@/lib/infrastructure/bullmq-adapter";
import { JobName, QueueName } from "@/lib/enums/bullmq";
// 添加任务
const program = queueManager.addJob(
QueueName.EMAIL,
JobName.EMAIL_SEND_WELCOME,
{ email: "user@example.com", username: "testuser" },
{ priority: 1, delay: 5000 }, // 可选配置
);
await Effect.runPromise(program);
使用 Job Schedulers API(BullMQ v5.16.0+):
// 调度定时任务
const program = queueManager.scheduleJob(
QueueName.CLEANUP,
JobName.CLEANUP_DAILY,
{},
{ pattern: "0 0 * * *" }, // Cron 表达式
);
await Effect.runPromise(program);
// 移除定时任务
await Effect.runPromise(
queueManager.unscheduleJob(QueueName.CLEANUP, JobName.CLEANUP_DAILY),
);
包含:
QueueName 枚举定义JobName 枚举定义包含:
JobDefinitionRegistry 类型映射JobSchemaRegistry 验证映射QueueJobsMapping 队列关联包含:
包含:
参考 examples/email-cleanup-queues.md 查看完整的 Email 和 Cleanup 队列实现。
// 添加任务(Effect 封装 + Zod 验证)
addJob<Q, N>(
queueName: Q,
jobName: N,
data: JobDataByName<N>,
opts?: JobsOptions,
): Effect<Job>
// 注册 Worker(Effect 封装)
registerWorker<Q>(
queueName: Q,
processor: (job: Job) => Promise<any>,
options?: WorkerOptions,
): Effect<Worker>
// 调度定时任务(Effect 封装 + Zod 验证)
scheduleJob<Q, N>(
queueName: Q,
jobName: N,
data: JobDataByName<N>,
repeatOptions: RepeatOptions,
): Effect<void>
// 移除定时任务
unscheduleJob(queueName: string, jobName: string): Effect<boolean>
// 查询定时任务
getScheduledJobs(queueName: string): Effect<JobScheduler[]>
// 获取队列实例(用于 Bull Board)
getQueue(name: string): Queue
// 优雅关闭
close(timeoutMs?: number): Effect<void>
addJob 和 registerWorker 方法logger.info({ jobId, ... }, "[Queue]: message"))timeoutgetQueue 直接添加任务(绕过类型检查和验证)console.log(使用 logger)QueueJobsMapping 中关联队列和任务import { BullMQService } from "@/lib/infrastructure/effect/services/bullmq";
const program = Effect.gen(function* () {
const qm = yield* BullMQService;
yield* qm.addJob(QueueName.EMAIL, JobName.EMAIL_SEND_WELCOME, data);
});
定时任务可能需要查询数据库:
import { subDays } from "date-fns";
import { lt } from "drizzle-orm";
import db from "@/db";
import { auditLogs } from "@/db/schema";
// Worker processor 中
const processor = async (job: Job<CleanupData>) => {
const { daysToKeep } = job.data;
const cutoffDate = subDays(new Date(), daysToKeep);
await db
.delete(auditLogs)
.where(lt(auditLogs.createdAt, cutoffDate.toISOString()));
logger.info({ daysToKeep }, "[Cleanup]: 旧日志已清理");
};
访问 /api/admin/queue-board 查看:
在应用启动时(src/lib/infrastructure/bootstrap.ts)注册所有 Worker。
BullMQ 自动重试(默认 3 次)。Worker 可配置:
registerWorker(queueName, processor, {
attempts: 5,
backoff: { type: "exponential", delay: 2000 },
});
使用 scheduleJob(内部调用 upsertJobScheduler),相同 schedulerId 会覆盖。
参考 src/lib/infrastructure/__tests__/bullmq-adapter.test.ts:
maxRetriesPerRequest: null)