(LLM系列)流式输出(Streaming)实现:提升用户体验
SSE Streaming Next.js Qwen AI
流式输出(Streaming)实现:提升用户体验
在现代 Web 应用中,用户体验的关键在于响应速度和交互反馈。当处理耗时操作时,传统的”等待-返回”模式往往让用户感到焦虑。流式输出(Streaming)技术通过逐步返回数据,让用户实时看到处理进度,极大提升了体验感知。本文将深入探讨如何在 Qwen Chatbot 项目中使用 SSE(Server-Sent Events)和异步处理实现流式输出。
为什么需要流式输出?
想象一个场景:用户向 AI 助手提问,传统方式需要等待完整答案生成后才能看到结果,可能需要等待数十秒。而流式输出允许答案逐字逐句地呈现,就像真人对话一样自然。这种即时反馈不仅减少了感知等待时间,还增强了应用的互动性。
流式输出的典型应用场景包括:
- AI 对话系统(ChatGPT 式交互)
- 大文件处理进度
- 实时日志输出
- 数据分析报告生成
技术选型:为什么选择 SSE?
在实现流式数据传输时,我们有几种选择:WebSocket、HTTP/2 Server Push 和 SSE。对于单向数据流(服务器到客户端),SSE 是最优方案:
- 简单易用:基于 HTTP 协议,无需复杂握手
- 自动重连:浏览器原生支持断线重连
- 轻量级:相比 WebSocket 更节省资源
- 防火墙友好:使用标准 HTTP 端口
Qwen Chatbot 项目中的实现方案
服务端:API 路由实现
Next.js 的 Pages Router 提供了强大的 API 路由功能,非常适合实现 SSE。以下是 Qwen Chatbot 项目中的完整实现示例:
import type { NextApiRequest, NextApiResponse } from 'next';import OpenAI from 'openai';
export default async function handler(req: NextApiRequest, res: NextApiResponse) { if (req.method !== 'POST') { return res.status(405).json({ error: 'Method not allowed' }); }
const { messages, stream = false, model, temperature = 0.7, top_p = 0.9, max_tokens = 2048 } = req.body;
// 验证必需字段 if (!messages || !Array.isArray(messages)) { return res.status(400).json({ error: 'Messages are required and must be an array' }); }
try { // 创建 OpenAI 兼容的客户端,适配通义千问 const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY || '', baseURL: process.env.OPENAI_API_BASE || 'https://dashscope.aliyuncs.com/compatible-mode/v1', });
if (stream) { // 使用 TransformStream 实现流式响应 const encoder = new TextEncoder(); const stream = new TransformStream(); const writer = stream.writable.getWriter();
// 异步处理函数 (async () => { try { // 通义千问API支持system message,直接使用原始消息 const response = await client.chat.completions.create({ model: model || process.env.MODEL_NAME || 'qwen-max', messages, stream: true, temperature, top_p, max_tokens, stream_options: { include_usage: true }, // 包含使用量信息 });
// 逐块发送数据 for await (const chunk of response) { const content = chunk.choices[0]?.delta?.content;
// 如果有内容,发送内容数据 if (content) { const data = `data: ${JSON.stringify({ content })}\n\n`; await writer.write(encoder.encode(data)); }
// 如果有usage信息,发送token使用数据 if (chunk.usage) { const tokenData = { usage: { prompt_tokens: chunk.usage.prompt_tokens, completion_tokens: chunk.usage.completion_tokens, total_tokens: chunk.usage.total_tokens, } }; const data = `data: ${JSON.stringify(tokenData)}\n\n`; await writer.write(encoder.encode(data)); } }
// 发送结束信号 await writer.write(encoder.encode('data: [DONE]\n\n')); } catch (error: any) { // 发送错误信息 await writer.write( encoder.encode(`data: ${JSON.stringify({ error: error.message || 'AI service error' })}\n\n`) ); } finally { await writer.close(); } })();
// 返回 SSE 响应 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); return new Response(stream.readable, { headers: { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }, }); } else { // 非流式响应 // 通义千问API支持system message,直接使用原始消息 const response = await client.chat.completions.create({ model: model || process.env.MODEL_NAME || 'qwen-max', messages, temperature, top_p, max_tokens, });
const content = response.choices[0]?.message?.content || ''; const usage = response.usage;
res.status(200).json({ content, usage: usage ? { prompt_tokens: usage.prompt_tokens, completion_tokens: usage.completion_tokens, total_tokens: usage.total_tokens, } : undefined }); } } catch (error: any) { console.error('Error calling Qwen API:', error);
let errorMessage = 'An error occurred while calling the API'; let statusCode = 500;
if (error.status === 401) { errorMessage = 'Authentication failed. Please check your API key.'; statusCode = 401; } else if (error.status === 403) { errorMessage = 'Access forbidden. Please check your API permissions.'; statusCode = 403; } else if (error.status === 429) { errorMessage = 'Rate limit exceeded. Please try again later.'; statusCode = 429; } else if (error.status === 404 && error.message.includes('model')) { errorMessage = 'Model not found or access denied. Please check the model name and your API permissions. Try using "qwen-max" instead of "qwen-max-0102".'; statusCode = 404; } else if (error.message) { errorMessage = error.message; }
res.status(statusCode).json({ error: errorMessage, details: process.env.NODE_ENV === 'development' ? error.toString() : undefined }); }}关键点解析:
- TransformStream:Next.js 推荐的流处理方式,比传统的 ReadableStream 更灵活
- TextEncoder:将字符串转换为 Uint8Array,符合流传输要求
- SSE 格式:数据必须以
data:开头,以\n\n结尾 - 异步 IIFE:立即执行的异步函数,避免阻塞响应返回
- 通义千问适配:使用 OpenAI 兼容的 API 客户端,适配通义千问 API
客户端:React 组件实现
客户端需要处理 SSE 连接并实时更新 UI,以下是 Qwen Chatbot 项目中的实现:
// pages/chat.tsx (SSE 处理部分)const handleSubmit = async (e: React.FormEvent) => { e.preventDefault(); if (!inputMessage.trim() || isLoading) return;
// 添加用户消息 const userMessage = { role: 'user', content: inputMessage }; dispatch({ type: 'ADD_MESSAGE', payload: userMessage }); dispatch({ type: 'SET_INPUT_MESSAGE', payload: '' }); setIsLoading(true);
try { // 准备消息数组,如果选择了角色并且该角色有系统提示,则在开头添加系统消息 let messagesToSend = [...messages, userMessage];
if (selectedRoleId) { const selectedRole = roles.find(r => r.id === selectedRoleId); if (selectedRole && selectedRole.systemPrompt) { // 检查是否已经有系统消息,如果没有则添加 const hasSystemMessage = messages.some(msg => msg.role === 'system'); if (!hasSystemMessage) { messagesToSend = [{ role: 'system', content: selectedRole.systemPrompt }, ...messagesToSend]; } } }
// 发送请求到后端 API // 使用流式响应获取实时token使用情况 const response = await fetch('/api/qwen', { method: 'POST', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ messages: messagesToSend, stream: true, // 使用流式响应 model: modelConfig.model, temperature: modelConfig.temperature, top_p: modelConfig.top_p, max_tokens: modelConfig.max_tokens, }), });
if (!response.ok) { const errorData = await response.json(); throw new Error(errorData.error || 'Failed to get response from API'); }
// 处理流式响应 const reader = response.body?.getReader(); if (!reader) { throw new Error('Could not read response body'); }
const decoder = new TextDecoder(); let assistantMessage: Message = { role: 'assistant', content: '', usage: undefined };
// 创建助手消息并添加到消息列表 const newAssistantMessage: Message = { role: 'assistant', content: '', usage: undefined }; dispatch({ type: 'ADD_MESSAGE', payload: newAssistantMessage });
while (true) { const { done, value } = await reader.read(); if (done) break;
const chunk = decoder.decode(value, { stream: true }); const lines = chunk.split('\n');
for (const line of lines) { if (line.startsWith('data: ')) { const data = line.slice(6); // 移除 'data: ' 前缀
if (data === '[DONE]') { // 流结束 break; }
try { const parsed = JSON.parse(data); if (parsed.content) { // 更新最后一条消息的内容 assistantMessage.content += parsed.content; // 只更新助手消息,保留之前的消息 const updatedMessages = [...messages, { ...assistantMessage }]; dispatch({ type: 'SET_MESSAGES', payload: updatedMessages }); } else if (parsed.usage) { // 更新最后一条消息的使用情况 assistantMessage.usage = parsed.usage; const updatedMessages = [...messages, { ...assistantMessage }]; dispatch({ type: 'SET_MESSAGES', payload: updatedMessages }); } } catch (e) { // 忽略无法解析的数据行 console.error('Error parsing data:', e); } } } }
// 在流结束后记录对话历史 const updatedMessages = [...messages, assistantMessage]; // 获取包含最新消息的完整消息列表 const lastAssistantMessage = updatedMessages[updatedMessages.length - 1]; // 最后一条消息应该是助手的回复
if (lastAssistantMessage && lastAssistantMessage.role === 'assistant') { const newHistoryEntry: ConversationHistory = { id: Date.now(), // 使用时间戳作为唯一ID timestamp: new Date().toISOString(), input: inputMessage, output: lastAssistantMessage.content, model: modelConfig.model, params: { temperature: modelConfig.temperature, top_p: modelConfig.top_p, max_tokens: modelConfig.max_tokens, }, tokenUsage: assistantMessage.usage ? { prompt_tokens: assistantMessage.usage.prompt_tokens, completion_tokens: assistantMessage.usage.completion_tokens, total_tokens: assistantMessage.usage.total_tokens } : undefined, evaluation: '' // 可以让使用者手动填写或系统自动生成 };
dispatch({ type: 'ADD_TO_HISTORY', payload: newHistoryEntry }); // 添加到历史记录开头 } } catch (error: any) { console.error('Error:', error); dispatch({ type: 'ADD_MESSAGE', payload: { role: 'assistant', content: `Error: ${error.message || 'An unknown error occurred'}` }});
// 即使出错也记录历史 const errorMessage = `Error: ${error.message || 'An unknown error occurred'}`; const newHistoryEntry: ConversationHistory = { id: Date.now(), // 使用时间戳作为唯一ID timestamp: new Date().toISOString(), input: inputMessage, output: errorMessage, model: modelConfig.model, params: { temperature: modelConfig.temperature, top_p: modelConfig.top_p, max_tokens: modelConfig.max_tokens, }, tokenUsage: undefined, // 错误情况下无token使用数据 evaluation: 'Error occurred' // 标记为错误 };
dispatch({ type: 'ADD_TO_HISTORY', payload: newHistoryEntry }); // 添加到历史记录开头 } finally { setIsLoading(false); }};核心实现要点:
- ReadableStream Reader:使用
getReader()逐块读取数据 - TextDecoder:将二进制数据解码为字符串
- 状态更新:通过 Redux-like 状态管理更新消息
- 错误处理:妥善处理解析错误和网络异常
- Token 使用情况:实时更新 API 调用的 token 使用情况
前端打字机效果实现
为了让流式输出看起来更自然,我们在前端实现了打字机效果:
import React, { useState, useEffect, useRef } from 'react';import styles from '../styles/TypeWriterEffect.module.css';
interface TypeWriterEffectProps { text: string; speed?: number; // 打字速度,毫秒/字符 className?: string; // 自定义类名}
const TypeWriterEffect: React.FC<TypeWriterEffectProps> = ({ text, speed = 50, // 放慢速度到50ms/字符,让效果更明显 className = ''}) => { const [displayedText, setDisplayedText] = useState(''); const [isTyping, setIsTyping] = useState(true); const timeoutRef = useRef<NodeJS.Timeout | null>(null);
useEffect(() => { // 每次text变化时重置 setDisplayedText(''); setIsTyping(true);
// 清除之前的定时器 if (timeoutRef.current) { clearTimeout(timeoutRef.current); }
// 如果文本为空,直接返回 if (!text) { setIsTyping(false); return; }
// 开始打字 let index = 0; const typeNextChar = () => { if (index < text.length) { const char = text[index]; // 确保字符不是undefined或null if (char !== undefined && char !== null) { // 强制更新,避免React优化 setDisplayedText(prev => prev + String(char)); } index++; timeoutRef.current = setTimeout(typeNextChar, speed); } else { setIsTyping(false); } };
timeoutRef.current = setTimeout(typeNextChar, speed);
// 清理 return () => { if (timeoutRef.current) { clearTimeout(timeoutRef.current); } }; }, [text, speed]);
return ( <span className={`${styles.typeWriterText} ${className}`}> {displayedText} {isTyping && <span className={styles.cursor}>|</span>} </span> );};
export default TypeWriterEffect;性能优化技巧
1. 背压处理(Backpressure)
当客户端处理速度跟不上服务端发送速度时,需要实现背压机制:
const writer = stream.writable.getWriter();
async function writeWithBackpressure(data: string) { await writer.ready; // 等待缓冲区可写 await writer.write(encoder.encode(data));}2. 分块策略
合理控制每次发送的数据量,避免过小(频繁网络开销)或过大(失去流式效果):
let buffer = '';const CHUNK_SIZE = 50; // 每 50 个字符发送一次
for (const char of response) { buffer += char; if (buffer.length >= CHUNK_SIZE) { await writeWithBackpressure(`data: ${JSON.stringify({ content: buffer })}\n\n`); buffer = ''; }}3. 连接管理
实现心跳检测,防止连接意外断开:
// 服务端定期发送心跳const heartbeatInterval = setInterval(() => { writer.write(encoder.encode(': heartbeat\n\n'));}, 30000);
// 清理process.on('exit', () => clearInterval(heartbeatInterval));实战案例:Qwen Chatbot 中的集成
在 Qwen Chatbot 项目中,我们将以上技术整合到了真实的 AI 对话系统中:
- 消息组件集成:在 ChatWindow 组件中使用 TypeWriterEffect 显示助手回复
- 状态管理:使用全局状态管理器跟踪消息流
- 实时更新:SSE 流实时更新助手消息内容
- 打字效果:前端实现的打字机效果增强用户体验
import TypeWriterEffect from './TypeWriterEffect';
// ...
{messages.map((msg, index) => ( <div key={index} className={`${styles.message} ${styles[msg.role]}`}> <div className={styles.avatar}> {msg.role === 'user' ? '👤' : '🤖'} </div> <div className={styles.content}> {msg.role === 'assistant' ? ( <TypeWriterEffect text={msg.content} speed={20} /> ) : ( msg.content )} {msg.usage && ( <div className={styles.tokenInfo}> Tokens: {msg.usage.total_tokens} (Prompt: {msg.usage.prompt_tokens}, Completion: {msg.usage.completion_tokens}) </div> )} </div> </div>))}注意事项与最佳实践
- 超时处理:设置合理的超时时间,避免连接永久挂起
- 错误恢复:客户端应实现重试机制,处理网络波动
- 资源清理:确保 writer 和 reader 正确关闭,防止内存泄漏
- CORS 配置:跨域场景需要正确配置响应头
- 进度指示:提供明确的加载状态,让用户知道系统正在工作
- 打字机效果优化:不能依赖 SSE 返回粒度,必须在前端主动控制显示节奏
- API 兼容性:适配不同 LLM 提供商的 API 格式差异
总结
流式输出通过 SSE 和异步处理技术,将”等待-返回”的交互模式转变为”实时反馈”的体验。在 Qwen Chatbot 项目中,借助 Next.js 和 Web Streams API,我们优雅地实现了这一功能。无论是 AI 对话、数据处理还是实时日志,流式输出都能显著提升用户体验。
通过结合后端流式传输和前端打字机效果,我们实现了既高效又直观的用户交互体验。随着 Web 技术的发展,流式处理将成为构建现代 AI 应用的标配能力。掌握这项技术,让你的应用更加流畅、响应更加迅速,为用户带来更好的交互体验。