Skip to content

两个实战:1)智能录入——用 withStructuredOutput 从无规则文本提取结构化数据,批量插入 MySQL;2)流式版 mini cursor——用 AIMessageChunk.concat() 拼接流式 chunk,JsonOutputToolsParser 增量解析 tool_call_chunks,Map 记录已打印长度实现逐字打印效果。withStructuredOutput 底层有三种机制:tool、json_schema、output_parser,平时直接用 withStructuredOutput 就行。

Output Parser 实战:智能录入 + 流式版 Mini Cursor

两个实战练习:结构化输出做智能录入、流式输出做打字机效果的 mini cursor。

实战一:智能录入(withStructuredOutput + MySQL)

传统录入:填表单 / 上传 Excel。AI 时代:给一段文本,大模型自动提取结构化数据。

定义 Schema

js
import { z } from 'zod';

const friendSchema = z.object({
  name: z.string().describe('姓名'),
  gender: z.string().describe('性别(男/女)'),
  birth_date: z.string().describe('出生日期,格式:YYYY-MM-DD,根据年龄估算'),
  company: z.string().nullable().describe('公司名称'),
  title: z.string().nullable().describe('职位/头衔'),
  phone: z.string().nullable().describe('手机号'),
  wechat: z.string().nullable().describe('微信号'),
});

const friendsArraySchema = z.array(friendSchema).describe('好友信息数组');

提取 + 插入数据库

js
const structuredModel = model.withStructuredOutput(friendsArraySchema);

async function extractAndInsert(text) {
  const prompt = `从以下文本中提取一个或多个人的信息:\n${text}`;

  // 1. 大模型提取结构化数据
  const results = await structuredModel.invoke(prompt);

  // 2. 批量插入 MySQL
  const insertSql = `INSERT INTO friends (name, gender, birth_date, company, title, phone, wechat) VALUES ?`;
  const values = results.map(item => [
    item.name, item.gender, item.birth_date,
    item.company, item.title, item.phone, item.wechat,
  ]);

  const [insertResult] = await connection.query(insertSql, [values]);
  console.log(`✅ 成功插入 ${insertResult.affectedRows} 条数据`);
}

输入一段无规则文本,AI 自动提取出结构化数据并批量入库。

补充:withStructuredOutput 的三种底层机制

text
1. Tool Call — 绑定工具 schema,模型返回 args
2. JSON Schema — 原生 response_format.json_schema,模型层面保证格式
3. Output Parser — prompt 加格式说明 + parse 响应

平时直接用 withStructuredOutput(schema) 就行,它会根据模型自动选择最优方案。

实战二:流式版 Mini Cursor

之前 mini cursor 等一分钟才能看到结果。学了流式 + output parser 后,可以实现打字机效果。

核心思路

text
非流式:invoke → AIMessage(包含完整 tool_calls)→ 执行工具
流式:  stream → AIMessageChunk → concat 拼接 → 增量解析 tool_call_chunks → 逐字打印

关键代码

js
import { JsonOutputToolsParser } from '@langchain/core/output_parsers/openai_tools';

const toolParser = new JsonOutputToolsParser();
const printedLengths = new Map();  // 记录每个 tool call 已打印的长度

// 流式处理
for await (const chunk of rawStream) {
  // 1. 拼接 AIMessageChunk
  fullAIMessage = fullAIMessage ? fullAIMessage.concat(chunk) : chunk;

  // 2. 增量解析 tool call 参数
  let parsedTools = null;
  try {
    parsedTools = await toolParser.parseResult([{ message: fullAIMessage }]);
  } catch (e) {
    // JSON 还不完整,继续累积
  }

  // 3. 增量打印 tool call 的 args
  if (parsedTools?.length > 0) {
    for (const toolCall of parsedTools) {
      if (toolCall.name === 'write_file') {
        const toolCallId = toolCall.id || 'default';
        const currentContent = String(toolCall.args.content);
        const previousLength = printedLengths.get(toolCallId) || 0;

        if (previousLength === 0) {
          console.log(`\n[工具调用] write_file("${toolCall.args.filePath}") - 流式预览\n`);
        }

        if (currentContent.length > previousLength) {
          process.stdout.write(currentContent.slice(previousLength));  // 增量打印
          printedLengths.set(toolCallId, currentContent.length);
        }
      }
    }
  } else {
    // 普通文本内容,直接打印
    if (chunk.content) process.stdout.write(chunk.content);
  }
}

// 4. 拼接完成后存入 memory
await history.addMessage(fullAIMessage);

// 5. 从完整的 AIMessage 取 tool_calls 执行工具
for (const toolCall of fullAIMessage.tool_calls) {
  const foundTool = tools.find(t => t.name === toolCall.name);
  const toolResult = await foundTool.invoke(toolCall.args);
  await history.addMessage(new ToolMessage({ content: toolResult, tool_call_id: toolCall.id }));
}

流式处理流程

text
chunk 到达
  → concat 拼接成 fullAIMessage
  → JsonOutputToolsParser 解析(可能失败,JSON 不完整)
  → 解析成功:增量打印 tool call 的 args
  → 普通文本:直接打印 content
  → 全部完成后:从 fullAIMessage 取 tool_calls 执行工具

要点

  • 智能录入 = withStructuredOutput + 数据库 — 从无规则文本提取结构化数据,批量入库
  • 流式打字机 = concat + JsonOutputToolsParser + Map — 增量解析 + 增量打印
  • AIMessageChunk.concat() — 流式拼接的关键 API
  • JsonOutputToolsParser.parseResult() — 增量解析 tool call 的 args
  • Map 记录已打印长度 — 实现增量打印,不重复打印
  • withStructuredOutput 三种底层机制 — tool、json_schema、output_parser,自动选最优