skip to content
logo

Search

使用流式传输减少用户等待时间

为什么要使用流式传输

由于大模型通常是需要实时推理的,Web 应用调用大模型时,它的标准模式是浏览器提交数据,服务端完成推理,然后将结果以 JSON 数据格式通过标准的 HTTP 协议返回给前端。

但是这么做有一个问题,主要是推理所花费的时间和问题复杂度、以及生成的 token 数量有关。如果我们提出稍微复杂一点的要求,比如编写一本小说的章节目录,或者撰写一篇千字的作文,那么 AI 推理的时间会大大增加,这在具体应用中就带来一个显而易见的问题,那就是用户等待的时间很长。

不过你应该已经发现,我们在使用线上大模型服务时,不管是哪一家大模型,通常前端的响应速度并没有太慢,这正是因为它们默认采用了流式(streaming)传输,不必等到整个推理完成再将内容返回,而是可以将逐个 token 实时返回给前端,这样就大大减少了响应时间。

使用流式(streaming)传输减少等待时间

大多数文本模型,都支持使用流式传输来返回内容。在流式传输下,在模型推理过程中,生成的 token 会及时返回,而不用等待推理过程完全结束。

创建一个 Vue+Vite+TypeScript 项目

<script setup lang="ts">
import { ref } from 'vue';

const question = ref('讲一个关于中国龙的故事');
const content = ref('');
const stream = ref(true);

const update = async () => {
  if(!question) return;
  content.value = "思考中...";

  const endpoint = 'https://api.deepseek.com/chat/completions';
  const headers = {
      'Content-Type': 'application/json',
      // 需要配置 deepseek api key
      Authorization: `Bearer ${import.meta.env.VITE_DEEPSEEK_API_KEY}`
  };

  const response = await fetch(endpoint, {
    method: 'POST',
    headers: headers,
    body: JSON.stringify({
      model: 'deepseek-chat',
      messages: [{ role: 'user', content: question.value }],
      stream: stream.value,// 是否采用流式传输
    })
  });
	
  if(stream.value) {
    content.value = '';

    const reader = response.body?.getReader();
    const decoder = new TextDecoder();
    let done = false;
    let buffer = '';

    while (!done) {
      const { value, done: doneReading } = await (reader?.read() as Promise<{ value: any; done: boolean }>);
      done = doneReading;
      const chunkValue = buffer + decoder.decode(value);
      buffer = '';

      const lines = chunkValue.split('\n').filter((line) => line.startsWith('data: '));

      for (const line of lines) {
        const incoming = line.slice(6);
        if(incoming === '[DONE]') {
          done = true;
          break;
        }
        try {
          const data = JSON.parse(incoming);
          const delta = data.choices[0].delta.content;
          if(delta) content.value += delta;
        } catch(ex) {
          buffer += `data: ${incoming}`;
        }
      }
    }
  } else {
    const data = await response.json();
    content.value = data.choices[0].message.content;
  }
}
</script>

<template>
  <div class="flex flex-col items-start justify-start h-screen text-sm">
    <div class="flex items-center gap-2 mb-4">
      <label class="text-gray-700">输入:</label>
      <input 
        class="w-48 px-3 py-1 border border-gray-300 rounded focus:outline-none focus:ring-2 focus:ring-blue-500"
        v-model="question"
      />
      <button 
        @click="update"
        class="px-3 py-1 bg-blue-500 text-white rounded hover:bg-blue-600 transition-colors"
      >
        提交
      </button>
    </div>
    <div class="w-full mt-3 p-4 border border-gray-200 rounded min-h-[300px]">
      <div class="mb-2 flex items-center">
        <label class="mr-2 text-gray-700">Streaming</label>
        <input 
          type="checkbox" 
          v-model="stream"
          class="h-4 w-4 text-blue-600 rounded focus:ring-blue-500"
        />
      </div>
      <div class="text-left whitespace-pre-wrap">{{ content }}</div>
    </div>
  </div>
</template>

在浏览器处理请求的时候,会通过 Web 标准的 Streams API 来处理数据,一个成功的流式 fetch 请求返回的响应体可以暴露为 ReadableStream,具体处理逻辑如下:

  1. 首先,我们利用 ReadableStream API 通过 getReader() 获取一个读取器
  2. 同时创建一个 TextDecoder 准备对二进制数据进行解码
  3. 然后我们设置控制流标志 done,以及一个 buffer 变量来缓存数据,因为某些情况下,Stream 数据返回给前端时,不一定传输完整。
  4. 接着我们开始循环读取数据,通过 TextDecoder 解析数据,将数据转换成文本并按行拆分。

因为 API 返回流式数据的协议(SSE 规范)是每一条数据以 “data:” 开头,后续是一个有效的 JSON 或者[DONE]表示传输结束,所以我们要对每一行以"data:"开头的数据进行处理。

如果数据传输完整,且不是[DONE],那么它就是合法 JSON,我们从中读取 data.choices[0].delta.content,就是需要增量更新的内容,否则说明数据不完整,将它存入缓存,以便后续继续处理。

这样我们就实现了数据的流式传输和浏览器的动态接收。

使用 Server-Sent Events

上面的做法虽然可以直接使用流式数据,但是处理起来还是略为繁琐。

实际上 Deepseek API 和其他大部分兼容 OpenAI 的平台,AI 返回的流式输出数据都是符合标准的Server-Sent Events(SSE)规范的,现代浏览器几乎都支持更简单的 SSE API,只不过我们目前暂时无法在前端直接使用它。

主要原因是,根据标准,SSE 的底层只支持 HTTP GET,并且不能发送自定义的 Header,而我们的授权却需要将 API Key 通过 Authorization Header 发送,而且必须使用 POST 请求。

尽管如此,并不意味着我们前端就不能使用 SSE 来处理流式输出,而是我们需要创建一个 BFF 层,通过 Node Server 来做中转。

在项目中安装依赖包 dotenv 和 express,然后在项目根目录下添加如下 server.js 文件:

import * as dotenv from 'dotenv'
import express from 'express';

dotenv.config({
  path: ['.env.local', '.env']
})

const openaiApiKey = process.env.VITE_DEEPSEEK_API_KEY;
const app = express();
const port = 3000;
const endpoint = 'https://api.deepseek.com/v1/chat/completions';

// SSE 端点
app.get('/stream', async (req, res) => {
    // 设置响应头部
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders(); // 发送初始响应头
  
    try {
      // 发送 OpenAI 请求
      const response = await fetch(
        endpoint,
        {
            method: 'POST',
            headers: {
                'Authorization': `Bearer ${openaiApiKey}`,
            },
            body: JSON.stringify({
                model:'deepseek-chat', // 选择你使用的模型
                messages: [{ role: 'user', content: req.query.question }],
                stream: true, // 开启流式响应
            })
        }
      );
  
      if (!response.ok) {
        throw new Error('Failed to fetch from OpenAI');
      }

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      let done = false;
      let buffer = '';

          // 读取流数据并转发到客户端
      while (!done) {
        const { value, done: doneReading } = await reader.read();
        done = doneReading;
        const chunkValue = buffer + decoder.decode(value, { stream: true });
        buffer = '';
  
        // 按行分割数据,每行以 "data: " 开头,并传递给客户端
        const lines = chunkValue.split('\n').filter(line => line.trim() && line.startsWith('data: '));
        for (const line of lines) {
            const incoming = line.slice(6);
            if(incoming === '[DONE]') {
              done = true;
              break;
            }
            try {
              const data = JSON.parse(incoming);
              const delta = data.choices[0].delta.content;
              if(delta) res.write(`data: ${delta}\n\n`); // 发送数据到客户端
            } catch(ex) {
              buffer += `data: ${incoming}`;
            }
        }
      }
  
      res.write('event: end\n'); // 发送结束事件
      res.write('data: [DONE]\n\n'); // 通知客户端数据流结束
      res.end(); // 关闭连接
  
    } catch (error) {
      console.error('Error fetching from OpenAI:', error);
      res.write('data: Error fetching from OpenAI\n\n');
      res.end();
    }
  });
  
  // 启动服务器
  app.listen(port, () => {
    console.log(`Server running on http://localhost:${port}`);
  });

完成后,我们在终端启动服务:node server.js

这个 server.js 的主要作用是在 server 端处理大模型 API 的流式响应,并将数据仍以兼容 SSE(以"data: "开头)的形式逐步发送给浏览器端。

现在我们在 IDE 中可以访问 http://localhost:3000/stream?question=hello 进行测试。

为了在前端页面上访问,我们可以通过配置 vite.config.js 文件的 server,将请求转发到 /api/stream

import { defineConfig } from 'vite';
import vue from '@vitejs/plugin-vue';
import vueDevTools from 'vite-plugin-vue-devtools';

// https://vitejs.dev/config/
export default defineConfig({
  server: {
    allowedHosts: true,
    port: 4399,
    proxy: {
      '/api': {
        target: 'http://localhost:3000',
        secure: false,
        rewrite: path => path.replace(/^\/api/, ''),
      },
    },
  },
  plugins: [
    vue(),
    vueDevTools(),
  ],
});

这样我们前端组件中的方法就可以精简为

const update = async () => {
	// 省略...
  if(stream.value) {
    content.value = '';
    const eventSource = new EventSource(`${endpoint}?question=${question.value}`);
    eventSource.addEventListener("message", function(e: any) {
      content.value += e.data;
    });
    eventSource.addEventListener('end', () => {  eventSource.close();});
  } else {
    const response = await fetch(endpoint, {
      method: 'POST',
      headers: headers,
      body: JSON.stringify({
        model: 'moonshot-v1-8k',
        messages: [{ role: 'user', content: question.value }],
        stream: stream.value,
      })
    });
    const data = await response.json();
    content.value = data.choices[0].message.content;
  }
}

注意和前面直接通过 Streams API 处理数据相比,有了 server 端处理转发后,浏览器只需使用 SSE。

这不仅仅让前端代码实现变得简洁很多,而且 SSE 在浏览器内置了自动重连机制。这意味着当网络、服务器或者客户端连接出现问题,恢复后将自动完成重新连接,不需要用户主动刷新页面,这让 SSE 特别适合长时间保持连接的应用场景。此外,SSE 还支持通过 lastEventId 来支持数据的续传,这样在错误恢复时,能大大节省数据传输的带宽和接收数据的响应时间。