/
Update
7 min read
中文 流式输出异常中断处理机制
流式输出异常中断处理机制#
1. 当前项目的异常处理架构#
┌─────────────────────────────────────────────────────────┐
│ 异常中断处理架构 │
├─────────────────────────────────────────────────────────┤
│ AbortController ──→ 请求取消控制 │
│ activeRequestRef ──→ 状态追踪与恢复 │
│ flushPendingContent ──→ 缓冲区强制刷新 │
│ Error Boundary ──→ 错误捕获与降级 │
└─────────────────────────────────────────────────────────┘plaintext2. 核心机制详解#
2.1 AbortController - 请求取消控制#
// 创建可取消的请求控制器
const controller = new AbortController();
// 发起请求时传入signal
const response = await fetch(apiURL, {
method: "POST",
headers: { ... },
body: JSON.stringify({ ... }),
signal: controller.signal, // 关键:绑定取消信号
});
// 用户点击"停止"时取消请求
function handleStop() {
const activeRequest = activeRequestRef.current;
if (!activeRequest) return;
activeRequest.controller.abort(); // 触发取消
activeRequest.onAbort?.(); // 执行清理回调
setLoading(false);
activeRequestRef.current = null;
}typescript2.2 状态追踪与恢复机制#
// 使用useRef追踪活跃请求状态
const activeRequestRef = useRef<ActiveRequestState | null>(null);
// 每次请求时保存状态
activeRequestRef.current = {
controller, // 取消控制器
latestMessages: newMessages, // 最新消息状态
syncMessages, // 状态同步函数
onAbort?: () => void, // 取消回调
};
// 异常时恢复最新状态
if (controller.signal.aborted) {
const latestMessages =
activeRequestRef.current?.controller === controller
? activeRequestRef.current.latestMessages // 获取中断前的状态
: newMessages;
// 清理空消息
const cleanedMessages = stripEmptyAssistantMessages(latestMessages);
if (cleanedMessages !== latestMessages) {
syncMessages(cleanedMessages); // 同步清理后的状态
}
return;
}typescript2.3 缓冲区强制刷新机制#
// 关键:确保缓冲区内容不丢失
let flushPendingContentForCleanup: (() => void) | null = null;
const flushPendingContent = () => {
if (!pendingContent) return;
assistantMessage.content += pendingContent; // 追加缓冲区内容
pendingContent = ""; // 清空缓冲区
syncAssistantMessage(); // 同步到UI
};
// 保存清理函数引用
flushPendingContentForCleanup = flushPendingContent;
// 异常时强制刷新
} catch (error) {
flushPendingContentForCleanup?.(); // 确保内容不丢失
if (controller.signal.aborted) {
// 用户主动取消的处理
return;
}
// 网络错误的处理
const errorMsg: Message = {
id: uid(),
role: "assistant",
content: `Request failed: ${formatRequestError(error)}`,
};
syncMessages([...newMessages, errorMsg]);
}typescript3. 断点续传的实现策略#
3.1 消息ID追踪机制#
// 为每条消息生成唯一ID
const assistantId = uid(); // 例如:"msg_abc123"
// 创建消息对象
const assistantMessage: Message = {
id: assistantId,
role: "assistant",
content: "",
};
// 增量更新而非替换
assistantMessage.content += pendingContent;
syncMessages([...newMessages, { ...assistantMessage }]);typescript3.2 乐观更新与状态同步#
// 乐观更新:立即显示用户消息
const newMessages = [...sessionMessages, userMsg];
syncMessages(newMessages); // 立即更新UI
try {
// 发起请求
const response = await fetch(...);
// 流式处理过程中持续同步
const syncMessages = (messages: Message[]) => {
if (activeRequestRef.current?.controller === controller) {
activeRequestRef.current.latestMessages = messages; // 保存最新状态
}
updateCurrentSession(messages); // 更新全局状态
};
} catch (error) {
// 异常时回滚或恢复
flushPendingContentForCleanup?.();
// ...
}typescript4. 平滑降级策略#
4.1 网络错误处理#
function formatRequestError(error: unknown): string {
if (error instanceof TypeError && error.message.includes("fetch")) {
return "网络连接失败,请检查网络设置";
}
if (error instanceof Response) {
switch (error.status) {
case 401:
return "API密钥无效,请检查配置";
case 429:
return "请求过于频繁,请稍后再试";
case 500:
return "服务器内部错误";
default:
return `请求失败 (${error.status})`;
}
}
return String(error);
}typescript4.2 搜索增强的降级处理#
// 当网络搜索失败时,回退到基础模型回答
const runFallback = async (note: string, sources: WebSearchSource[] = []) => {
const fallbackProgress: Message = {
...progressMessage,
content: formatWebSearchProgress({
stage: "fallback",
query: searchDecision.query,
note,
sources,
}),
};
syncMessages([...sessionMessages, userMsg, fallbackProgress]);
try {
// 使用基础模型回答(不依赖网络搜索)
const fallbackAnswer = await requestAssistantText(apiConfig, controller, {
model: apiConfig.model,
messages: [...historyMessages, userRequestMessage],
});
// 显示降级回答
syncMessages([
...sessionMessages,
userMsg,
fallbackProgress,
{ id: assistantId, role: "assistant", content: fallbackAnswer },
]);
} catch (fallbackError) {
if (controller.signal.aborted) return;
// 最终错误提示
const errorMsg: Message = {
id: uid(),
role: "assistant",
content: `请求失败:${formatRequestError(fallbackError)}`,
};
syncMessages([...sessionMessages, userMsg, fallbackProgress, errorMsg]);
}
};typescript5. 最终一致性保证#
5.1 空消息清理#
// 检查消息是否为空
function isEmptyAssistantMessage(message: Message): boolean {
return message.role === "assistant" && !message.content.trim();
}
// 清理空消息,确保状态一致性
function stripEmptyAssistantMessages(messages: Message[]): Message[] {
const cleanedMessages = messages.filter(
(message) => !isEmptyAssistantMessage(message)
);
return cleanedMessages;
}
// 中断时清理
const cleanedMessages = stripEmptyAssistantMessages(latestMessages);
if (cleanedMessages !== latestMessages) {
syncMessages(cleanedMessages);
}typescript5.2 状态验证机制#
// 同步前验证状态有效性
const syncMessages = (messages: Message[]) => {
// 验证是当前活跃请求
if (activeRequestRef.current?.controller === controller) {
activeRequestRef.current.latestMessages = messages;
}
// 验证消息完整性
const validMessages = messages.filter(msg =>
msg.id && msg.role && typeof msg.content === 'string'
);
updateCurrentSession(validMessages);
};typescript6. 增强的断点续传方案(建议实现)#
6.1 消息分片存储#
// 将长消息分片存储,支持断点续传
interface MessageChunk {
id: string;
messageId: string;
index: number;
content: string;
isComplete: boolean;
}
// 恢复时合并分片
function mergeMessageChunks(chunks: MessageChunk[]): string {
return chunks
.sort((a, b) => a.index - b.index)
.map(c => c.content)
.join('');
}typescript6.2 重连机制#
// 自动重连策略
class StreamReconnectManager {
private retryCount = 0;
private maxRetries = 3;
private retryDelay = 1000; // 1秒
async connectWithRetry(
requestFn: () => Promise<Response>,
onChunk: (chunk: string) => void
): Promise<void> {
try {
const response = await requestFn();
this.retryCount = 0; // 重置重试计数
await this.processStream(response, onChunk);
} catch (error) {
if (this.retryCount < this.maxRetries) {
this.retryCount++;
await this.delay(this.retryDelay * this.retryCount);
return this.connectWithRetry(requestFn, onChunk);
}
throw error;
}
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}typescript7. 总结#
| 机制 | 作用 | 实现方式 |
|---|---|---|
| AbortController | 请求取消控制 | 用户主动停止 |
| activeRequestRef | 状态追踪 | useRef保存最新状态 |
| flushPendingContent | 缓冲区保护 | 异常时强制刷新 |
| 乐观更新 | UI即时响应 | 先更新UI再请求 |
| 空消息清理 | 状态一致性 | 过滤无效消息 |
| 降级策略 | 服务可用性 | 搜索失败回退基础模型 |
当前项目已经实现了基础的异常中断处理,但在自动重连和消息分片续传方面还有优化空间。建议根据实际业务需求,逐步实现更完善的断点续传机制。