(LLM系列)流式输出(Streaming)实现:提升用户体验

(LLM系列)流式输出(Streaming)实现:提升用户体验


SSE Streaming Next.js Qwen AI

流式输出(Streaming)实现:提升用户体验

在现代 Web 应用中,用户体验的关键在于响应速度和交互反馈。当处理耗时操作时,传统的”等待-返回”模式往往让用户感到焦虑。流式输出(Streaming)技术通过逐步返回数据,让用户实时看到处理进度,极大提升了体验感知。本文将深入探讨如何在 Qwen Chatbot 项目中使用 SSE(Server-Sent Events)和异步处理实现流式输出。

为什么需要流式输出?

想象一个场景:用户向 AI 助手提问,传统方式需要等待完整答案生成后才能看到结果,可能需要等待数十秒。而流式输出允许答案逐字逐句地呈现,就像真人对话一样自然。这种即时反馈不仅减少了感知等待时间,还增强了应用的互动性。

流式输出的典型应用场景包括:

  • AI 对话系统(ChatGPT 式交互)
  • 大文件处理进度
  • 实时日志输出
  • 数据分析报告生成

技术选型:为什么选择 SSE?

在实现流式数据传输时,我们有几种选择:WebSocket、HTTP/2 Server Push 和 SSE。对于单向数据流(服务器到客户端),SSE 是最优方案:

  • 简单易用:基于 HTTP 协议,无需复杂握手
  • 自动重连:浏览器原生支持断线重连
  • 轻量级:相比 WebSocket 更节省资源
  • 防火墙友好:使用标准 HTTP 端口

Qwen Chatbot 项目中的实现方案

服务端:API 路由实现

Next.js 的 Pages Router 提供了强大的 API 路由功能,非常适合实现 SSE。以下是 Qwen Chatbot 项目中的完整实现示例:

pages/api/qwen.ts
import type { NextApiRequest, NextApiResponse } from 'next';
import OpenAI from 'openai';
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
if (req.method !== 'POST') {
return res.status(405).json({ error: 'Method not allowed' });
}
const { messages, stream = false, model, temperature = 0.7, top_p = 0.9, max_tokens = 2048 } = req.body;
// 验证必需字段
if (!messages || !Array.isArray(messages)) {
return res.status(400).json({ error: 'Messages are required and must be an array' });
}
try {
// 创建 OpenAI 兼容的客户端,适配通义千问
const client = new OpenAI({
apiKey: process.env.OPENAI_API_KEY || '',
baseURL: process.env.OPENAI_API_BASE || 'https://dashscope.aliyuncs.com/compatible-mode/v1',
});
if (stream) {
// 使用 TransformStream 实现流式响应
const encoder = new TextEncoder();
const stream = new TransformStream();
const writer = stream.writable.getWriter();
// 异步处理函数
(async () => {
try {
// 通义千问API支持system message,直接使用原始消息
const response = await client.chat.completions.create({
model: model || process.env.MODEL_NAME || 'qwen-max',
messages,
stream: true,
temperature,
top_p,
max_tokens,
stream_options: { include_usage: true }, // 包含使用量信息
});
// 逐块发送数据
for await (const chunk of response) {
const content = chunk.choices[0]?.delta?.content;
// 如果有内容,发送内容数据
if (content) {
const data = `data: ${JSON.stringify({ content })}\n\n`;
await writer.write(encoder.encode(data));
}
// 如果有usage信息,发送token使用数据
if (chunk.usage) {
const tokenData = {
usage: {
prompt_tokens: chunk.usage.prompt_tokens,
completion_tokens: chunk.usage.completion_tokens,
total_tokens: chunk.usage.total_tokens,
}
};
const data = `data: ${JSON.stringify(tokenData)}\n\n`;
await writer.write(encoder.encode(data));
}
}
// 发送结束信号
await writer.write(encoder.encode('data: [DONE]\n\n'));
} catch (error: any) {
// 发送错误信息
await writer.write(
encoder.encode(`data: ${JSON.stringify({ error: error.message || 'AI service error' })}\n\n`)
);
} finally {
await writer.close();
}
})();
// 返回 SSE 响应
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
} else {
// 非流式响应
// 通义千问API支持system message,直接使用原始消息
const response = await client.chat.completions.create({
model: model || process.env.MODEL_NAME || 'qwen-max',
messages,
temperature,
top_p,
max_tokens,
});
const content = response.choices[0]?.message?.content || '';
const usage = response.usage;
res.status(200).json({
content,
usage: usage ? {
prompt_tokens: usage.prompt_tokens,
completion_tokens: usage.completion_tokens,
total_tokens: usage.total_tokens,
} : undefined
});
}
} catch (error: any) {
console.error('Error calling Qwen API:', error);
let errorMessage = 'An error occurred while calling the API';
let statusCode = 500;
if (error.status === 401) {
errorMessage = 'Authentication failed. Please check your API key.';
statusCode = 401;
} else if (error.status === 403) {
errorMessage = 'Access forbidden. Please check your API permissions.';
statusCode = 403;
} else if (error.status === 429) {
errorMessage = 'Rate limit exceeded. Please try again later.';
statusCode = 429;
} else if (error.status === 404 && error.message.includes('model')) {
errorMessage = 'Model not found or access denied. Please check the model name and your API permissions. Try using "qwen-max" instead of "qwen-max-0102".';
statusCode = 404;
} else if (error.message) {
errorMessage = error.message;
}
res.status(statusCode).json({
error: errorMessage,
details: process.env.NODE_ENV === 'development' ? error.toString() : undefined
});
}
}

关键点解析:

  1. TransformStream:Next.js 推荐的流处理方式,比传统的 ReadableStream 更灵活
  2. TextEncoder:将字符串转换为 Uint8Array,符合流传输要求
  3. SSE 格式:数据必须以 data: 开头,以 \n\n 结尾
  4. 异步 IIFE:立即执行的异步函数,避免阻塞响应返回
  5. 通义千问适配:使用 OpenAI 兼容的 API 客户端,适配通义千问 API

客户端:React 组件实现

客户端需要处理 SSE 连接并实时更新 UI,以下是 Qwen Chatbot 项目中的实现:

// pages/chat.tsx (SSE 处理部分)
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!inputMessage.trim() || isLoading) return;
// 添加用户消息
const userMessage = { role: 'user', content: inputMessage };
dispatch({ type: 'ADD_MESSAGE', payload: userMessage });
dispatch({ type: 'SET_INPUT_MESSAGE', payload: '' });
setIsLoading(true);
try {
// 准备消息数组,如果选择了角色并且该角色有系统提示,则在开头添加系统消息
let messagesToSend = [...messages, userMessage];
if (selectedRoleId) {
const selectedRole = roles.find(r => r.id === selectedRoleId);
if (selectedRole && selectedRole.systemPrompt) {
// 检查是否已经有系统消息,如果没有则添加
const hasSystemMessage = messages.some(msg => msg.role === 'system');
if (!hasSystemMessage) {
messagesToSend = [{ role: 'system', content: selectedRole.systemPrompt }, ...messagesToSend];
}
}
}
// 发送请求到后端 API
// 使用流式响应获取实时token使用情况
const response = await fetch('/api/qwen', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
messages: messagesToSend,
stream: true, // 使用流式响应
model: modelConfig.model,
temperature: modelConfig.temperature,
top_p: modelConfig.top_p,
max_tokens: modelConfig.max_tokens,
}),
});
if (!response.ok) {
const errorData = await response.json();
throw new Error(errorData.error || 'Failed to get response from API');
}
// 处理流式响应
const reader = response.body?.getReader();
if (!reader) {
throw new Error('Could not read response body');
}
const decoder = new TextDecoder();
let assistantMessage: Message = { role: 'assistant', content: '', usage: undefined };
// 创建助手消息并添加到消息列表
const newAssistantMessage: Message = { role: 'assistant', content: '', usage: undefined };
dispatch({ type: 'ADD_MESSAGE', payload: newAssistantMessage });
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') {
// 流结束
break;
}
try {
const parsed = JSON.parse(data);
if (parsed.content) {
// 更新最后一条消息的内容
assistantMessage.content += parsed.content;
// 只更新助手消息,保留之前的消息
const updatedMessages = [...messages, { ...assistantMessage }];
dispatch({ type: 'SET_MESSAGES', payload: updatedMessages });
} else if (parsed.usage) {
// 更新最后一条消息的使用情况
assistantMessage.usage = parsed.usage;
const updatedMessages = [...messages, { ...assistantMessage }];
dispatch({ type: 'SET_MESSAGES', payload: updatedMessages });
}
} catch (e) {
// 忽略无法解析的数据行
console.error('Error parsing data:', e);
}
}
}
}
// 在流结束后记录对话历史
const updatedMessages = [...messages, assistantMessage]; // 获取包含最新消息的完整消息列表
const lastAssistantMessage = updatedMessages[updatedMessages.length - 1]; // 最后一条消息应该是助手的回复
if (lastAssistantMessage && lastAssistantMessage.role === 'assistant') {
const newHistoryEntry: ConversationHistory = {
id: Date.now(), // 使用时间戳作为唯一ID
timestamp: new Date().toISOString(),
input: inputMessage,
output: lastAssistantMessage.content,
model: modelConfig.model,
params: {
temperature: modelConfig.temperature,
top_p: modelConfig.top_p,
max_tokens: modelConfig.max_tokens,
},
tokenUsage: assistantMessage.usage ? {
prompt_tokens: assistantMessage.usage.prompt_tokens,
completion_tokens: assistantMessage.usage.completion_tokens,
total_tokens: assistantMessage.usage.total_tokens
} : undefined,
evaluation: '' // 可以让使用者手动填写或系统自动生成
};
dispatch({ type: 'ADD_TO_HISTORY', payload: newHistoryEntry }); // 添加到历史记录开头
}
} catch (error: any) {
console.error('Error:', error);
dispatch({ type: 'ADD_MESSAGE', payload: {
role: 'assistant',
content: `Error: ${error.message || 'An unknown error occurred'}`
}});
// 即使出错也记录历史
const errorMessage = `Error: ${error.message || 'An unknown error occurred'}`;
const newHistoryEntry: ConversationHistory = {
id: Date.now(), // 使用时间戳作为唯一ID
timestamp: new Date().toISOString(),
input: inputMessage,
output: errorMessage,
model: modelConfig.model,
params: {
temperature: modelConfig.temperature,
top_p: modelConfig.top_p,
max_tokens: modelConfig.max_tokens,
},
tokenUsage: undefined, // 错误情况下无token使用数据
evaluation: 'Error occurred' // 标记为错误
};
dispatch({ type: 'ADD_TO_HISTORY', payload: newHistoryEntry }); // 添加到历史记录开头
} finally {
setIsLoading(false);
}
};

核心实现要点:

  1. ReadableStream Reader:使用 getReader() 逐块读取数据
  2. TextDecoder:将二进制数据解码为字符串
  3. 状态更新:通过 Redux-like 状态管理更新消息
  4. 错误处理:妥善处理解析错误和网络异常
  5. Token 使用情况:实时更新 API 调用的 token 使用情况

前端打字机效果实现

为了让流式输出看起来更自然,我们在前端实现了打字机效果:

components/TypeWriterEffect.tsx
import React, { useState, useEffect, useRef } from 'react';
import styles from '../styles/TypeWriterEffect.module.css';
interface TypeWriterEffectProps {
text: string;
speed?: number; // 打字速度,毫秒/字符
className?: string; // 自定义类名
}
const TypeWriterEffect: React.FC<TypeWriterEffectProps> = ({
text,
speed = 50, // 放慢速度到50ms/字符,让效果更明显
className = ''
}) => {
const [displayedText, setDisplayedText] = useState('');
const [isTyping, setIsTyping] = useState(true);
const timeoutRef = useRef<NodeJS.Timeout | null>(null);
useEffect(() => {
// 每次text变化时重置
setDisplayedText('');
setIsTyping(true);
// 清除之前的定时器
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
// 如果文本为空,直接返回
if (!text) {
setIsTyping(false);
return;
}
// 开始打字
let index = 0;
const typeNextChar = () => {
if (index < text.length) {
const char = text[index];
// 确保字符不是undefined或null
if (char !== undefined && char !== null) {
// 强制更新,避免React优化
setDisplayedText(prev => prev + String(char));
}
index++;
timeoutRef.current = setTimeout(typeNextChar, speed);
} else {
setIsTyping(false);
}
};
timeoutRef.current = setTimeout(typeNextChar, speed);
// 清理
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, [text, speed]);
return (
<span className={`${styles.typeWriterText} ${className}`}>
{displayedText}
{isTyping && <span className={styles.cursor}>|</span>}
</span>
);
};
export default TypeWriterEffect;

性能优化技巧

1. 背压处理(Backpressure)

当客户端处理速度跟不上服务端发送速度时,需要实现背压机制:

const writer = stream.writable.getWriter();
async function writeWithBackpressure(data: string) {
await writer.ready; // 等待缓冲区可写
await writer.write(encoder.encode(data));
}

2. 分块策略

合理控制每次发送的数据量,避免过小(频繁网络开销)或过大(失去流式效果):

let buffer = '';
const CHUNK_SIZE = 50; // 每 50 个字符发送一次
for (const char of response) {
buffer += char;
if (buffer.length >= CHUNK_SIZE) {
await writeWithBackpressure(`data: ${JSON.stringify({ content: buffer })}\n\n`);
buffer = '';
}
}

3. 连接管理

实现心跳检测,防止连接意外断开:

// 服务端定期发送心跳
const heartbeatInterval = setInterval(() => {
writer.write(encoder.encode(': heartbeat\n\n'));
}, 30000);
// 清理
process.on('exit', () => clearInterval(heartbeatInterval));

实战案例:Qwen Chatbot 中的集成

在 Qwen Chatbot 项目中,我们将以上技术整合到了真实的 AI 对话系统中:

  1. 消息组件集成:在 ChatWindow 组件中使用 TypeWriterEffect 显示助手回复
  2. 状态管理:使用全局状态管理器跟踪消息流
  3. 实时更新:SSE 流实时更新助手消息内容
  4. 打字效果:前端实现的打字机效果增强用户体验
components/ChatWindow.tsx
import TypeWriterEffect from './TypeWriterEffect';
// ...
{messages.map((msg, index) => (
<div key={index} className={`${styles.message} ${styles[msg.role]}`}>
<div className={styles.avatar}>
{msg.role === 'user' ? '👤' : '🤖'}
</div>
<div className={styles.content}>
{msg.role === 'assistant' ? (
<TypeWriterEffect text={msg.content} speed={20} />
) : (
msg.content
)}
{msg.usage && (
<div className={styles.tokenInfo}>
Tokens: {msg.usage.total_tokens} (Prompt: {msg.usage.prompt_tokens}, Completion: {msg.usage.completion_tokens})
</div>
)}
</div>
</div>
))}

注意事项与最佳实践

  1. 超时处理:设置合理的超时时间,避免连接永久挂起
  2. 错误恢复:客户端应实现重试机制,处理网络波动
  3. 资源清理:确保 writer 和 reader 正确关闭,防止内存泄漏
  4. CORS 配置:跨域场景需要正确配置响应头
  5. 进度指示:提供明确的加载状态,让用户知道系统正在工作
  6. 打字机效果优化:不能依赖 SSE 返回粒度,必须在前端主动控制显示节奏
  7. API 兼容性:适配不同 LLM 提供商的 API 格式差异

总结

流式输出通过 SSE 和异步处理技术,将”等待-返回”的交互模式转变为”实时反馈”的体验。在 Qwen Chatbot 项目中,借助 Next.js 和 Web Streams API,我们优雅地实现了这一功能。无论是 AI 对话、数据处理还是实时日志,流式输出都能显著提升用户体验。

通过结合后端流式传输和前端打字机效果,我们实现了既高效又直观的用户交互体验。随着 Web 技术的发展,流式处理将成为构建现代 AI 应用的标配能力。掌握这项技术,让你的应用更加流畅、响应更加迅速,为用户带来更好的交互体验。


项目地址