glownight

返回

流式输出异常中断处理机制#

1. 当前项目的异常处理架构#

┌─────────────────────────────────────────────────────────┐
│                    异常中断处理架构                        │
├─────────────────────────────────────────────────────────┤
│  AbortController ──→ 请求取消控制                        │
│  activeRequestRef ──→ 状态追踪与恢复                      │
│  flushPendingContent ──→ 缓冲区强制刷新                   │
│  Error Boundary ──→ 错误捕获与降级                        │
└─────────────────────────────────────────────────────────┘
plaintext

2. 核心机制详解#

2.1 AbortController - 请求取消控制#

// 创建可取消的请求控制器
const controller = new AbortController();

// 发起请求时传入signal
const response = await fetch(apiURL, {
  method: "POST",
  headers: { ... },
  body: JSON.stringify({ ... }),
  signal: controller.signal,  // 关键:绑定取消信号
});

// 用户点击"停止"时取消请求
function handleStop() {
  const activeRequest = activeRequestRef.current;
  if (!activeRequest) return;
  
  activeRequest.controller.abort();  // 触发取消
  activeRequest.onAbort?.();         // 执行清理回调
  setLoading(false);
  activeRequestRef.current = null;
}
typescript

2.2 状态追踪与恢复机制#

// 使用useRef追踪活跃请求状态
const activeRequestRef = useRef<ActiveRequestState | null>(null);

// 每次请求时保存状态
activeRequestRef.current = {
  controller,                    // 取消控制器
  latestMessages: newMessages,   // 最新消息状态
  syncMessages,                  // 状态同步函数
  onAbort?: () => void,          // 取消回调
};

// 异常时恢复最新状态
if (controller.signal.aborted) {
  const latestMessages = 
    activeRequestRef.current?.controller === controller
      ? activeRequestRef.current.latestMessages  // 获取中断前的状态
      : newMessages;
  
  // 清理空消息
  const cleanedMessages = stripEmptyAssistantMessages(latestMessages);
  
  if (cleanedMessages !== latestMessages) {
    syncMessages(cleanedMessages);  // 同步清理后的状态
  }
  return;
}
typescript

2.3 缓冲区强制刷新机制#

// 关键:确保缓冲区内容不丢失
let flushPendingContentForCleanup: (() => void) | null = null;

const flushPendingContent = () => {
  if (!pendingContent) return;
  
  assistantMessage.content += pendingContent;  // 追加缓冲区内容
  pendingContent = "";                          // 清空缓冲区
  syncAssistantMessage();                       // 同步到UI
};

// 保存清理函数引用
flushPendingContentForCleanup = flushPendingContent;

// 异常时强制刷新
} catch (error) {
  flushPendingContentForCleanup?.();  // 确保内容不丢失
  
  if (controller.signal.aborted) {
    // 用户主动取消的处理
    return;
  }
  
  // 网络错误的处理
  const errorMsg: Message = {
    id: uid(),
    role: "assistant",
    content: `Request failed: ${formatRequestError(error)}`,
  };
  syncMessages([...newMessages, errorMsg]);
}
typescript

3. 断点续传的实现策略#

3.1 消息ID追踪机制#

// 为每条消息生成唯一ID
const assistantId = uid();  // 例如:"msg_abc123"

// 创建消息对象
const assistantMessage: Message = {
  id: assistantId,
  role: "assistant",
  content: "",
};

// 增量更新而非替换
assistantMessage.content += pendingContent;
syncMessages([...newMessages, { ...assistantMessage }]);
typescript

3.2 乐观更新与状态同步#

// 乐观更新:立即显示用户消息
const newMessages = [...sessionMessages, userMsg];
syncMessages(newMessages);  // 立即更新UI

try {
  // 发起请求
  const response = await fetch(...);
  
  // 流式处理过程中持续同步
  const syncMessages = (messages: Message[]) => {
    if (activeRequestRef.current?.controller === controller) {
      activeRequestRef.current.latestMessages = messages;  // 保存最新状态
    }
    updateCurrentSession(messages);  // 更新全局状态
  };
  
} catch (error) {
  // 异常时回滚或恢复
  flushPendingContentForCleanup?.();
  // ...
}
typescript

4. 平滑降级策略#

4.1 网络错误处理#

function formatRequestError(error: unknown): string {
  if (error instanceof TypeError && error.message.includes("fetch")) {
    return "网络连接失败,请检查网络设置";
  }
  
  if (error instanceof Response) {
    switch (error.status) {
      case 401:
        return "API密钥无效,请检查配置";
      case 429:
        return "请求过于频繁,请稍后再试";
      case 500:
        return "服务器内部错误";
      default:
        return `请求失败 (${error.status})`;
    }
  }
  
  return String(error);
}
typescript

4.2 搜索增强的降级处理#

// 当网络搜索失败时,回退到基础模型回答
const runFallback = async (note: string, sources: WebSearchSource[] = []) => {
  const fallbackProgress: Message = {
    ...progressMessage,
    content: formatWebSearchProgress({
      stage: "fallback",
      query: searchDecision.query,
      note,
      sources,
    }),
  };
  
  syncMessages([...sessionMessages, userMsg, fallbackProgress]);

  try {
    // 使用基础模型回答(不依赖网络搜索)
    const fallbackAnswer = await requestAssistantText(apiConfig, controller, {
      model: apiConfig.model,
      messages: [...historyMessages, userRequestMessage],
    });
    
    // 显示降级回答
    syncMessages([
      ...sessionMessages,
      userMsg,
      fallbackProgress,
      { id: assistantId, role: "assistant", content: fallbackAnswer },
    ]);
    
  } catch (fallbackError) {
    if (controller.signal.aborted) return;
    
    // 最终错误提示
    const errorMsg: Message = {
      id: uid(),
      role: "assistant",
      content: `请求失败:${formatRequestError(fallbackError)}`,
    };
    syncMessages([...sessionMessages, userMsg, fallbackProgress, errorMsg]);
  }
};
typescript

5. 最终一致性保证#

5.1 空消息清理#

// 检查消息是否为空
function isEmptyAssistantMessage(message: Message): boolean {
  return message.role === "assistant" && !message.content.trim();
}

// 清理空消息,确保状态一致性
function stripEmptyAssistantMessages(messages: Message[]): Message[] {
  const cleanedMessages = messages.filter(
    (message) => !isEmptyAssistantMessage(message)
  );
  return cleanedMessages;
}

// 中断时清理
const cleanedMessages = stripEmptyAssistantMessages(latestMessages);
if (cleanedMessages !== latestMessages) {
  syncMessages(cleanedMessages);
}
typescript

5.2 状态验证机制#

// 同步前验证状态有效性
const syncMessages = (messages: Message[]) => {
  // 验证是当前活跃请求
  if (activeRequestRef.current?.controller === controller) {
    activeRequestRef.current.latestMessages = messages;
  }
  
  // 验证消息完整性
  const validMessages = messages.filter(msg => 
    msg.id && msg.role && typeof msg.content === 'string'
  );
  
  updateCurrentSession(validMessages);
};
typescript

6. 增强的断点续传方案(建议实现)#

6.1 消息分片存储#

// 将长消息分片存储,支持断点续传
interface MessageChunk {
  id: string;
  messageId: string;
  index: number;
  content: string;
  isComplete: boolean;
}

// 恢复时合并分片
function mergeMessageChunks(chunks: MessageChunk[]): string {
  return chunks
    .sort((a, b) => a.index - b.index)
    .map(c => c.content)
    .join('');
}
typescript

6.2 重连机制#

// 自动重连策略
class StreamReconnectManager {
  private retryCount = 0;
  private maxRetries = 3;
  private retryDelay = 1000; // 1秒
  
  async connectWithRetry(
    requestFn: () => Promise<Response>,
    onChunk: (chunk: string) => void
  ): Promise<void> {
    try {
      const response = await requestFn();
      this.retryCount = 0; // 重置重试计数
      await this.processStream(response, onChunk);
    } catch (error) {
      if (this.retryCount < this.maxRetries) {
        this.retryCount++;
        await this.delay(this.retryDelay * this.retryCount);
        return this.connectWithRetry(requestFn, onChunk);
      }
      throw error;
    }
  }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}
typescript

7. 总结#

机制作用实现方式
AbortController请求取消控制用户主动停止
activeRequestRef状态追踪useRef保存最新状态
flushPendingContent缓冲区保护异常时强制刷新
乐观更新UI即时响应先更新UI再请求
空消息清理状态一致性过滤无效消息
降级策略服务可用性搜索失败回退基础模型

当前项目已经实现了基础的异常中断处理,但在自动重连消息分片续传方面还有优化空间。建议根据实际业务需求,逐步实现更完善的断点续传机制。

流式输出异常中断处理机制
作者 glownight
发布于 2026年4月29日