一键导入
langchain-streaming
Stream outputs from LangChain agents and models - includes stream modes, token streaming, progress updates, and real-time feedback
用 Codex 或 Claude 帮你安装 复制这段 Prompt,粘贴到 Codex、Claude 或其他助手里,让它检查 Skill 页面并帮你完成安装。
菜单
Stream outputs from LangChain agents and models - includes stream modes, token streaming, progress updates, and real-time feedback
用 Codex 或 Claude 帮你安装 复制这段 Prompt,粘贴到 Codex、Claude 或其他助手里,让它检查 Skill 页面并帮你完成安装。
基于 SOC 职业分类
| name | langchain-streaming |
| description | Stream outputs from LangChain agents and models - includes stream modes, token streaming, progress updates, and real-time feedback |
| language | js |
流式传输让您能够在 LangChain 代理和模型运行时实时显示更新。您可以逐步显示输出,而不是等待完整的响应,从而改善用户体验,特别是对于长时间运行的操作。
核心概念:
| 场景 | 流式传输? | 原因 |
|---|---|---|
| 长模型响应 | ✅ 是 | 在生成时显示 token |
| 多步骤代理任务 | ✅ 是 | 通过步骤显示进度 |
| 长时间运行的工具 | ✅ 是 | 提供进度更新 |
| 简单快速请求 | ⚠️ 可能 | 开销可能不值得 |
| 后端批处理 | ❌ 否 | 没有用户等待更新 |
| 模式 | 何时使用 | 返回 |
|---|---|---|
"values" | 需要每步后的完整状态 | 完整状态对象 |
"updates" | 仅需要变化的内容 | 状态增量 |
"messages" | 需要 LLM token 流 | [token, metadata] 元组 |
"custom" | 需要自定义进度信号 | 用户定义的数据 |
| 多个模式 | 需要组合数据 | 模式数组 |
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({ model: "gpt-4.1" });
// 在 token 到达时流式传输
const stream = await model.stream("用简单的术语解释量子计算");
for await (const chunk of stream) {
process.stdout.write(chunk.content);
}
// 输出渐进出现:"量子" "计算" "是" ...
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [searchTool, calculatorTool],
});
// 使用 "updates" 模式流式传输代理步骤
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "搜索 AI 新闻并总结" }] },
{ streamMode: "updates" }
)) {
console.log("步骤:", JSON.stringify(chunk, null, 2));
}
// 显示每个步骤:模型调用、工具执行、最终响应
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [searchTool],
});
// 流式传输 LLM token 和代理进度
for await (const [mode, chunk] of await agent.stream(
{ messages: [{ role: "user", content: "研究 LangChain" }] },
{ streamMode: ["updates", "messages"] }
)) {
if (mode === "messages") {
// LLM token 流
const [token, metadata] = chunk;
if (token.content) {
process.stdout.write(token.content);
}
} else if (mode === "updates") {
// 代理步骤更新
console.log("\n步骤更新:", chunk);
}
}
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [weatherTool],
});
// 在每步后获取完整状态
for await (const state of await agent.stream(
{ messages: [{ role: "user", content: "天气怎么样?" }] },
{ streamMode: "values" }
)) {
console.log("当前消息数:", state.messages.length);
console.log("最后一条消息:", state.messages[state.messages.length - 1].content);
}
import { tool } from "langchain";
import { z } from "zod";
const processData = tool(
async ({ data }, { runtime }) => {
const total = data.length;
for (let i = 0; i < total; i += 100) {
// 发出自定义进度更新
await runtime.stream_writer.write({
type: "progress",
data: {
processed: i,
total: total,
percentage: (i / total) * 100,
},
});
// 进行实际处理
await processChunk(data.slice(i, i + 100));
}
return "处理完成";
},
{
name: "process_data",
description: "处理数据并提供进度更新",
schema: z.object({
data: z.array(z.any()),
}),
}
);
// 流式传输自定义更新
for await (const [mode, chunk] of await agent.stream(
{ messages: [{ role: "user", content: "处理这些数据" }] },
{ streamMode: ["custom", "updates"] }
)) {
if (mode === "custom") {
console.log(`进度:${chunk.data.percentage}%`);
}
}
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [searchTool],
});
// Express.js 端点
app.post("/api/chat", async (req, res) => {
// 为服务器发送事件设置标头
res.setHeader("Content-Type", "text/event-stream");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
try {
for await (const [mode, chunk] of await agent.stream(
{ messages: req.body.messages },
{ streamMode: ["messages", "updates"] }
)) {
if (mode === "messages") {
const [token, metadata] = chunk;
if (token.content) {
// 向客户端发送 token
res.write(`data: ${JSON.stringify({ type: "token", content: token.content })}\n\n`);
}
} else if (mode === "updates") {
// 向客户端发送步骤更新
res.write(`data: ${JSON.stringify({ type: "step", data: chunk })}\n\n`);
}
}
res.write("data: [DONE]\n\n");
res.end();
} catch (error) {
res.write(`data: ${JSON.stringify({ type: "error", message: error.message })}\n\n`);
res.end();
}
});
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [riskyTool],
});
try {
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "执行风险操作" }] },
{ streamMode: "updates" }
)) {
// 检查更新中的错误
if ("__error__" in chunk) {
console.error("流式传输错误:", chunk.__error__);
break;
}
console.log("更新:", chunk);
}
} catch (error) {
console.error("流式传输错误:", error);
}
import { createAgent } from "langchain";
const agent = createAgent({
model: "gpt-4.1",
tools: [slowTool],
});
async function streamWithTimeout(timeoutMs: number) {
const timeout = setTimeout(() => {
throw new Error(`流式传输在 ${timeoutMs}ms 后超时`);
}, timeoutMs);
try {
for await (const chunk of await agent.stream(
{ messages: [{ role: "user", content: "做一些慢操作" }] },
{ streamMode: "updates" }
)) {
clearTimeout(timeout);
console.log(chunk);
// 为下一个块重置超时
timeout.setTimeout(() => {
throw new Error(`流式传输在 ${timeoutMs}ms 后超时`);
}, timeoutMs);
}
} finally {
clearTimeout(timeout);
}
}
import { ChatOpenAI } from "@langchain/openai";
const model = new ChatOpenAI({ model: "gpt-4.1" });
let buffer = "";
const stream = await model.stream("写一篇长文章");
for await (const chunk of stream) {
buffer += chunk.content;
// 每 10 个字符或完整单词更新一次 UI
if (buffer.length >= 10 || chunk.content.includes(" ")) {
console.log(buffer);
buffer = "";
}
}
// 刷新剩余缓冲区
if (buffer) {
console.log(buffer);
}
✅ 流式模式:选择要流式传输的数据 ✅ 多个模式:结合不同的流式类型 ✅ 自定义更新:发出用户定义的进度数据 ✅ 块处理:根据需要处理每个块 ✅ 错误处理:捕获和处理流式传输错误
❌ 块大小:由模型/提供商确定 ❌ 块时间:提供商发送时到达 ❌ 保证顺序:异步流可能不同 ❌ 修改过去的块:块是不可变的
// ❌ 问题:缺少 await
const stream = agent.stream(input, { streamMode: "updates" });
for await (const chunk of stream) { // 错误:stream 是 Promise!
console.log(chunk);
}
// ✅ 解决方案:等待流式传输初始化
const stream = await agent.stream(input, { streamMode: "updates" });
for await (const chunk of stream) {
console.log(chunk);
}
// ❌ 问题:messages 模式的错误属性
for await (const chunk of await agent.stream(input, { streamMode: "messages" })) {
console.log(chunk.content); // undefined!
}
// ✅ 解决方案:messages 模式返回 [token, metadata] 元组
for await (const chunk of await agent.stream(input, { streamMode: "messages" })) {
const [token, metadata] = chunk;
console.log(token.content); // 正确!
}
// ❌ 问题:使用错误的模式进行 token 流式传输
for await (const chunk of await agent.stream(input, { streamMode: "updates" })) {
console.log(chunk.content); // 不是 updates 的工作方式!
}
// ✅ 解决方案:使用 "messages" 模式进行 token 流式传输
for await (const chunk of await agent.stream(input, { streamMode: "messages" })) {
const [token, metadata] = chunk;
console.log(token.content);
}
// ❌ 问题:没有正确清理
for await (const chunk of await agent.stream(input)) {
if (someCondition) {
break; // 流式传输可能无法正确清理
}
}
// ✅ 解决方案:使用 try/finally 或显式清理
const stream = await agent.stream(input);
try {
for await (const chunk of stream) {
if (someCondition) {
break;
}
}
} finally {
// 如果需要,进行清理
}
// ❌ 问题:未处理不同的模式
for await (const chunk of await agent.stream(
input,
{ streamMode: ["updates", "messages"] }
)) {
console.log(chunk); // 这是哪种模式?
}
// ✅ 解决方案:解构模式
for await (const [mode, chunk] of await agent.stream(
input,
{ streamMode: ["updates", "messages"] }
)) {
if (mode === "messages") {
const [token, metadata] = chunk;
console.log(token.content);
} else if (mode === "updates") {
console.log("步骤:", chunk);
}
}
Mac 系统深度清理和优化工具。使用 Mole (mo 命令) 执行系统清理、磁盘分析、应用卸载、系统优化等任务。 触发场景(当用户提到以下任一内容时使用此 skill): - 清理 Mac、清理磁盘、释放空间、清理缓存、清理系统 - 卸载应用、删除应用、移除应用及其残留 - 磁盘分析、查看磁盘占用、大文件查找、空间分析 - 系统优化、系统维护、刷新系统、重建缓存 - 系统状态、系统监控、CPU/内存/磁盘监控 - 清理 node_modules、清理构建产物、清理项目依赖 - 清理安装包、删除 dmg/pkg 文件 - Mac 清理工具、类似 CleanMyMac 的功能 - "我的 Mac 太慢了"、"磁盘空间不足"、"电脑卡顿" - 即使没有明确说 "Mole",只要涉及上述场景就应使用
快速搭建和配置 pnpm monorepo 项目结构,包含 TypeScript、tsup 构建、私有 npm registry 配置。当用户需要"创建 monorepo"、"初始化 monorepo 项目"、"配置 pnpm workspace"、"设置 monorepo 构建"、"monorepo setup"时使用。特别适合需要统一管理多个包、配置构建工具、处理 TypeScript 路径问题的场景。即使用户只是说"帮我搭建项目结构"或"配置构建",如果涉及多包管理也应该使用此 skill。
智能拆分暂存区的代码变更为多个符合 Conventional Commits 规范的逻辑提交。当用户需要将大量变更按逻辑关系分组提交时使用,比如"拆分这些提交"、"把暂存区的变更分成多个 commit"、"按功能分别提交"、"split commits"等场景。特别适合处理包含多个模块、多种类型文件(配置、代码、测试、文档)的复杂变更集。
OKR 优化与质量评估专家。当用户需要:(1) 评估现有 OKR 的质量,(2) 优化模糊或不可量化的关键结果,(3) 检查 OKR 是否符合核心原则(聚焦、可量化、有挑战),(4) 将任务型 KR 转化为结果型 KR,(5) 提供具体的改进建议时使用。触发词包括"帮我优化 OKR"、"检查这个 OKR"、"这个 KR 写得好吗"、"如何量化这个目标"。
基于 git commits 自动生成 CHANGELOG.md 变更日志。支持语义化版本、分类整理、多格式输出。触发场景包括"生成变更日志"、"更新 CHANGELOG"、"版本记录"。
GitHub PR 代码审查技能。检查代码质量、安全性、性能和最佳实践,生成结构化审查报告。触发场景包括"审查 PR"、"代码检查"、"review pull request"。