添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

手摸手带你实现前端乞丐版的 ChatGPT

589 人阅读 8 分钟阅读

一开始看到 chatgpt 的流式渲染,有点好奇流式渲染是如何实现的,无意之间发现 vercel 的库:ai,仔细学习了它的代码,写的小而精,把它看明白了之后,想着写一篇文章来输出一下。 所以我就根据 ai 这个库,来一步步带大家来实现前端乞丐版 chatgpt ,把这篇文章看完之后,也可以去看看 ai 这个库,代码是写的真不错(会发现代码都是抄它的,哈哈哈哈)!!!

blog-header-cover
一开始看到 chatgpt 的流式渲染,有点好奇流式渲染是如何实现的,无意之间发现 vercel 的库:ai,仔细学习了它的代码,写的小而精,把它看明白了之后,想着写一篇文章来输出一下。
所以我就根据 ai 这个库,来一步步带大家来实现前端乞丐版 chatgpt ,把这篇文章看完之后,也可以去看看 ai 这个库,代码是写的真不错(会发现代码都是抄它的,哈哈哈哈)!!!
本文会使用 Next13,不熟悉也没关系,用到的 api 不多,不懂的 api 可以查看 Next13 文档
注意:需要申请有一个 openai 的 apiKey ,不然就无法调用接口哦。
效果展示:
basic-demo.gif
basic-demo.gif

什么是网络流

先看看一个概念:网络流,平时可能用不到。流是一种用于访问数据的数据结构,比如说:文件、接口返回的数据等等。
使用流有两个好处:
  • 可以处理大量数据,流可以将它们分成更小的部分(chunk),可以一次处理一个(chunk)。
  • 可以使用相同的数据结构、流,同时处理不同的数据,这使得代码变得更加复用。
在网络流中,一个 chunk 通常是:
  • 文本流:string
  • 二进制流:Uint8Arrays
网络流主要有三种:
  • ReadableStream:用于从数据源中读取数据。执行此操作的代码成为消费者。
  • WritableStream:用户将数据写入。执行此操作的代码成为生产者。
  • TransformStream 由两个流组成:
    • 它从其可写端(WritableStream)接收输入。
    • 它将输出发送到它的可读端,一个 ReadableStream。
    本文中只会使用到 ReadableStream 和 TransformStream。

    ReadableStream

    ReadableStream 可以从各种来源读取数据块,类型声明如下:
    interface ReadableStream<TChunk> {
    getReader(): ReadableStreamDefaultReader<TChunk>;
    pipeThrough<TChunk2>(
    transform: ReadableWritablePair<TChunk2, TChunk>,
    options?: StreamPipeOptions
    ): ReadableStream<TChunk2>;
    readonly locked: boolean;
    // 忽略
    // [Symbol.asyncIterator](): AsyncIterator<TChunk>;
    // cancel(reason?: any): Promise<void>;
    // pipeTo(
    // destination: WritableStream<TChunk>,
    // options?: StreamPipeOptions
    // ): Promise<void>;
    // tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];
    }
    这三个属性的作用是:
    • getReader() :返回一个 Reader,可以从 ReadableStream 读取的对象,返回的 Reader 类似于迭代器。
    • locked :每个 ReadableStream 一次只能有一个活动的 Reader,当 Reader 在使用时,ReadableStream 被锁定并且 getReader() 不能被调用。
    • pipeThrough() :将其 ReadableStream 连接到 ReadableWritablePair(一个 TransformStream)。它返回一个新的 ReadableStream(类比一下:把它理解成一个数组的 map 方法)。
    下面来看看 getReader() 的返回类型:
    interface ReadableStreamGenericReader {
    cancel(reason?: any): Promise<void>;
    // 忽略
    // readonly closed: Promise<undefined>;
    }
    interface ReadableStreamDefaultReader<TChunk>
    extends ReadableStreamGenericReader {
    releaseLock(): void;
    read(): Promise<ReadableStreamReadResult<TChunk>>;
    }
    interface ReadableStreamReadResult<TChunk> {
    done: boolean;
    value: TChunk | undefined;
    }
    • cancel() :在一个活动的 Reader 中,这个方法取消关联的 ReadableStream。
    • releaseLock() :停用 Reader 并解锁流。
    • read() 返回来两个属性的 ReadableStreamReadResult 的 Promise:
      • done :布尔值,false 表示可以读取,true 表示最后一个块。
      • value :块(chunk)。
      师傅,别念 api 了,再念人都要傻了,赶紧来一个 demo 吧。
      以下是通过 getReader 方式来读取 ReadableStream 的小例子。
      const reader = readableStream.getReader(); // (A)
      console.log(readableStream.locked); // true (B)
      let result = '';
      try {
      while (true) {
      const { done, value } = await reader.read(); // (C)
      if (done) {
      break;
      }
      result += value; // (D)
      }
      } finally {
      reader.releaseLock(); // (E)
      }
      console.log('result', result);
      • A 行:不能直接读取 readableStream,需要调用 getReader() 来获取 Reader。
      • B 行:在 getReader() 之后,readableStream 被锁定,所以 B 行打印的是 true,如果想再次调用 getReader() ,必须调用 releaseLock() (E 行)。
      • C 行: read() 返回属性 done 和 value,如果 done 为 true,表示是最后一个块,
      • D 行:可以对返回的 value 进行操作,这里是将返回的 value 全部都加在一个字符串里。

      通过包装将数据源转化为 ReadableStream

      如果想通过 ReadableStream 读取外部源,可以将其包装在适配器对象中并将该对象传递给构造函数 ReadableStream。
      new ReadableStream(underlyingSource?, queuingStrategy?)
      以下是类型声明:
      interface UnderlyingSource<TChunk> {
      start?(controller: ReadableStreamController<TChunk>): void | Promise<void>;
      // 忽略
      // pull?(controller: ReadableStreamController<TChunk>): void | Promise<void>;
      // cancel?(reason?: any): void | Promise<void>;
      // type: 'bytes' | undefined;
      // autoAllocateChunkSize: bigint;
      }
      • start(controller) 调用构造函数后立即调用 start 方法。
      controller 的参数类型如下:
      type ReadableStreamController<TChunk> =
      | ReadableStreamDefaultController<TChunk>
      | ReadableByteStreamController<TChunk>; // 先忽略
      interface ReadableStreamDefaultController<TChunk> {
      enqueue(chunk?: TChunk): void;
      close(): void;
      // 忽略
      // readonly desiredSize: number | Null;
      // error(err?: any): void;
      }
      • enqueue(chunk) :添加 chunk 到 ReadableStream 的内部队列。
      • close() :关闭 ReadableStream,消费者仍然可以清空队列,在那之后,流结束。

      自定义 ReadableStream demo

      async function test14() {
      const readableStream = new ReadableStream({
      start(controller) {
      controller.enqueue('First Line\n'); // (A)
      controller.enqueue('Second Line\n'); // (B)
      controller.close(); // (C)
      },
      });
      for await (const chunk of readableStream) {
      console.log(chunk);
      }
      }
      test14();
      // First Line
      // Second Line
      ReadableStream 是异步可迭代的,可以使用 for-await-of 来进行迭代。
      使用控制器创建一个包含两个块的流(A 和 B 行),关闭流(C 行)很重要,否则 for-await-of 永远不会结束。

      TransformStream

      • 通过其可写端(WritableStream)接受输入。
      • 然后它可能会或可能不会转换输入。
      • 结果可以通过 ReadableStream 读取,它是可读的。
      使用 TransformStream 最常见的方式是 pipeThrough
      const transformStream = readableStream.pipeThrough(transformStream);
      .pipeThrough() 将 readableStream 传输到 transformStream 的可写端,并进行转换返回其可读端。
      换句话说: 创建了一个新的 ReadableStream,它是 ReadableStream 的转换版本,类似于数组的 map。
      一个简单的 demo:
      async function test21() {
      const encoder = new TextEncoder();
      const readableByteStream = new ReadableStream({
      start(controller) {
      controller.enqueue(encoder.encode('hello\n'));
      controller.enqueue(encoder.encode('world\n'));
      },
      });
      const readableStream = readableByteStream.pipeThrough(
      new TextDecoderStream('utf-8')
      );
      for await (const stringChunk of readableStream) {
      console.log(stringChunk);
      }
      }
      test21();
      TextEncoder.encode():将字符串作为输入,并返回 Uint8Array 包含 UTF-8 编码的文本。
      使用了内置 TransformStream: TextDecoderStream() ,作用就是将接收到的二进制流转换为可读的文本流(Uint8Array -> string)。

      自定义 TransformStream

      跟上面的 ReadableStream 类似,如果要自定义 TransformStream,也可以传递适配器对象给构造函数 TransformStream。
      它具有以下类型:
      interface TransformStream<InChunk, OutChunk> {
      start?(
      controller: TransformStreamDefaultController<OutChunk>
      ): void | Promise<void>;
      transform?(
      chunk: InChunk,
      controller: TransformStreamDefaultController<OutChunk>
      ): void | Promise<void>;
      // 忽略
      // flush?(
      // chunk: InChunk,
      // controller: TransformStreamDefaultController<OutChunk>
      // ): void | Promise<void>;
      }
      上面属性的解释:
      • start(controller) :在调用构造函数后立即被调用,可以在转换之前做一些准备。
      • transform(chunk, controller) 执行实际的转化 。接受一个输入块,并可以使用 controller 将一个或多个转换后的输出块排队。
      该 contrller 具有以下类型:
      interface TransformStreamDefaultController<OutChunk> {
      enqueue(chunk?: OutChunk): void;
      terminate(): void;
      // 忽略
      // readonly desiredSize: number | null;
      // error(err?: any): void;
      }
      • enqueue(chunk) :添加 chunk 到 TransformStream 的可读端(输出)。
      • terminate() :关闭 TransformStream 的可读端(输出)并在可写端(输入)出错。如果转换器对可写端(输入)的剩余块不感兴趣并想跳过他们,则可以使用它。

      小 demo

      说了一大堆 api,来一个简单的例子:
      async function test20() {
      // 创建一个 ReadableStream 对象
      const readableStream = new ReadableStream({
      start(controller) {
      controller.enqueue('hello');
      controller.enqueue('world');
      controller.close();
      },
      });
      // 创建一个 TransformStream 对象
      const transformer = new TransformStream({
      transform(chunk, controller) {
      // 对输入数据进行转换处理
      const transformedChunk = chunk.toUpperCase();
      // 将转换后的数据通过 controller.enqueue() 方法推送到输出流 ReadableStream
      controller.enqueue(transformedChunk);
      },
      });
      // 通过 TransformStream 进行处理流的转换
      const newReadableStream = readableStream.pipeThrough(transformer);
      // 使用 reader 方式来读取 ReadableStream
      const reader = newReadableStream.getReader();
      while (true) {
      const { done, value } = await reader.read();
      if (done) {
      break;
      }
      console.log(value); // HELLO WORLD
      }
      }
      test20();

      重新回顾 fetch API

      Fetch API 是一种用于获取和发送网络资源的现代 Web API。它提供了一种替代 XMLHttpRequest 的方式,可以更简单、更灵活地进行网络请求。
      Fetch API 使用 Promise 对象来返回请求结果,可以轻松地将其与 async/await 结合使用。
      简单的小例子:
      async function main() {
      const res = await fetch('https://example.com');
      const data = await res.json();
      console.log('data', data);
      }
      main();
      这里要用的是 fetch 返回的属性 body ,它返回的是 ReadableStream
      async function main() {
      const res = await fetch('https://example.com');
      const body = res.body; // ReadableStream
      const reader = body.getReader();
      while (true) {
      const { done, value } = await reader.read();
      if (done) {
      break;
      }
      console.log('value', value);
      }
      }
      main();
      用到了上面提到的 ReadableStream,也算是简单的回顾一下。
      需要注意的是: fetch.body 返回的是二进制流,后面会再提到
      讲到这里,终于把前置的知识熟悉一下,我知道你很急,但是你先别急。
      我们来进入实战环节。

      初始化项目

      首先使用 pnpm create next-app 初始化一个 Next13 项目。
      • What is your project named? openai-stream
      • Would you like to use TypeScript with this project? Yes
      • Would you like to use ESLint with this project? Yes
      • Would you like to use Tailwind CSS with this project? Yes
      • Would you like to use src/ directory with this project? No
      • Use App Router (recommended)? Yes
      • Would you like to customize the default import alias? No
      初始化之后就会安装 TypeScript、Eslint 和 Tailwind CSS。

      当个 UI 仔也挺好

      第一步就开始画 UI。
      UI 的话主要有两个部分:
      1. 消息列表展示。
      2. input 输入框。

      MessageCard 渲染信息

      创建类型文件 type.ts 定义关于 message 的类型。
      // types.ts
      export type Message = {
      id: string;
      createdAt?: Date;
      content: string;
      role: 'system' | 'user' | 'assistant';
      };
      MessageCard 组件用来渲染输入和 openai 返回的信息。
      import { Message } from '@/types';
      import classNames from 'classnames';
      interface MessageCardProps {
      message: Message;
      }
      type AvatarProps = Pick<Message, 'role'>;
      const Avatar = ({ role }: AvatarProps) => {
      const getName = () => (role === 'user' ? 'U' : 'AI');
      return (
      <span
      className={classNames(
      'w-6 h-6 inline-flex items-center justify-center rounded-full min-w-[24px]',
      role === 'user' ? 'bg-orange-300' : 'bg-green-400'
      )}
      >
      {getName()}
      </span>
      );
      };
      const MessageCard = ({ message }: MessageCardProps) => {
      return (
      <div key={message.id} className="flex items-center">
      <Avatar role={message.role} />
      <div className="ml-2">{message.content}</div>
      </div>
      );
      };
      export default MessageCard;

      基础页面 + input 输入框

      下一步画基础的页面和 input 输入框。
      直接在 app/page.tsx 里面书写即可。
      // app/page.tsx
      'use client';
      import MessageCard from './MessageCard';
      import { Message } from '@/types';
      const Chat = () => {
      const messages: Message[] = [
      { id: '1', content: 'hello', role: 'user' },
      { id: '2', content: 'world', role: 'assistant' },
      ];
      return (
      <div className="flex h-full flex-col w-full max-w-xl pb-36 pt-9 mx-auto stretch">
      <ul className="space-y-4">
      {messages.map((message) => (
      <MessageCard key={message.id} message={message} />
      ))}
      </ul>
      <form
      onSubmit={(e) => {
      e.preventDefault();
      }}
      >
      <input
      className="w-full p-3 focus-visible:outline-gray-300 border border-gray-300 rounded-md shadow-xl focus:shadow-2xl transition-all"
      placeholder="随便说点什么..."
      />
      </form>
      </div>
      );
      };
      export default Chat;
      先 mock 消息列表,看看展示效果咋样。
      mock-message.png
      mock-message.png
      在 app 目录下新建文件 app/api/chat/route.ts ,用来处理 api 请求。
      // app/api/chat/route.ts
      // GET 是书写规范!!!
      export async function GET() {
      const data = 'hello world';
      return NextResponse.json({ data });
      }
      可以直接访问 http://localhost:3000/api/chat 可以看到返回的数据。
      get-api-route-hello-world.png
      get-api-route-hello-world.png
      关于 Next.js 的 Route 可以查看 相关文档 ,不过多赘述。
      需要发送 POST 请求将 message 传递给 openai,通过 export function POST 就可以处理 POST 请求。
      // app/api/chat/route.ts
      export const runtime = 'edge'; // 如果要流式渲染则需要加上这一行
      export async function POST(req: Request) {
      const stream = AIStream();
      console.log('stream:', stream);
      return new StreamingTextResponse(stream);
      }
      libs/streaming-text-response.ts 就是对于 Response 的简单封装,将状态码置为 200。
      // libs/streaming-text-response.ts
      export class StreamingTextResponse extends Response {
      constructor(res: ReadableStream, init?: ResponseInit) {
      super(res as any, {
      ...init,
      status: 200,
      headers: {
      'Content-Type': 'text/plain; charset=utf-8',
      ...init?.headers,
      },
      });
      }
      }
      创建文件 libs/ai-stream.ts 用来处理网络流,使用 ReadableStream 先 mock 两条数据。
      // libs/ai-stream.ts
      export function AIStream(): ReadableStream {
      const stream = new ReadableStream({
      start(controller) {
      controller.enqueue('hello\n');
      controller.enqueue('world\n');
      controller.close();
      },
      });
      return stream;
      }

      hook: use-chat

      下一步写一个 hook 来进行页面交互,创建文件 hooks/use-chat.ts
      需要安装一些依赖包:nanoid 和 swr。
      • nanoid 是可以生成唯一 ID 的库。
      • swr 是用于数据请求和缓存的库。
      pnpm add nanoid swr
      将 CreateMessage 和 UseChatOptions 添加到 types.ts 文件,定义好 use-chat 的类型声明。
      // types.ts
      // 省略其他代码...
      export type CreateMessage = {
      id?: string;
      createdAt?: Date;
      content: string;
      role: 'system' | 'user' | 'assistant';
      };
      // type.ts
      export type UseChatOptions = {
      // 指定聊天功能的API地址,默认为'/api/chat'
      api?: string;
      // 指定聊天的唯一标识符,如果没有指定,则使用useId生成唯一的Hook ID
      id?: string;
      // 消息列表初始化内容,默认为空数组
      initialMessages?: Message[];
      // 输入框的初始内容,默认为空字符串
      initialInput?: string;
      };
      // hooks/use-chat.ts
      export type UseChatHelpers = {
      // 当前消息列表
      messages: Message[];
      // 请求过程中可能出现的错误
      error: undefined | Error;
      // 向聊天中追加一条消息
      append: (
      message: Message | CreateMessage
      ) => Promise<string | null | undefined>;
      // 设置聊天消息列表
      setMessages: (messages: Message[]) => void;
      // 输入框内容
      input: string;
      // 设置输入框内容
      setInput: Dispatch<SetStateAction<string>>;
      // 处理输入框内容变化
      handleInputChange: (
      e: ChangeEvent<HTMLInputElement> | ChangeEvent<HTMLTextAreaElement>
      ) => void;
      // 处理表单提交
      handleSubmit: (e: FormEvent<HTMLFormElement>) => void;
      // 请求是否正在加载中
      isLoading: boolean;
      };
      export function useChat(options?: UseChatOptions): UseChatHelpers;

      use-chat 具体逻辑

      下面来写对应的 use-chat 具体逻辑:
      export function useChat({
      api = '/api/chat',
      id,
      initialInput = '',
      initialMessages = [],
      }: UseChatOptions = {}): UseChatHelpers {
      const hookId = useId();
      // 生成一个chatId
      const chatId = id || hookId;
      const { data, mutate } = useSWR<Message[]>([api, chatId], null, {
      fallbackData: initialMessages,
      });
      const messages = data!;
      // 用 ref 保存最新的消息列表
      const messagesRef = useRef<Message[]>(messages);
      useEffect(() => {
      messagesRef.current = messages;
      }, [messages]);
      // 处理input输入
      const [input, setInput] = useState(initialInput);
      const handleInputChange = useCallback(
      (e: ChangeEvent<HTMLInputElement> | ChangeEvent<HTMLTextAreaElement>) => {
      setInput(e.target.value);
      },
      []
      );
      // 设置消息列表
      const setMessages = useCallback(
      (messages: Message[]) => {
      mutate(messages, false);
      messagesRef.current = messages;
      },
      [mutate]
      );
      return {
      messages,
      setMessages,
      input,
      setInput,
      handleInputChange,
      };
      }
      使用 useSWR 来声明一个状态,第二个参数(fetcher)传入 null,表示不需要进行网络请求,可以把它当成本地状态来处理,返回的 mutate 函数可以对这个状态进行更新。

      小小的工具函数

      新建 utils.ts 文件用来保存两个工具函数: nanoid、createChunkDecoder。
      • nanoid:使用 customAlphabet 函数创建了自定义的 ID 生成器 nanoid,用于生成唯一 ID。
      • createChunkDecoder:用于将 Uint8Array 类型的数据块解码成字符串(Uint8Array -> string)。
      // utils.ts
      import { customAlphabet } from 'nanoid';
      export const nanoid = customAlphabet(
      '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz',
      7
      );
      export function createChunkDecoder() {
      const decoder = new TextDecoder();
      return function (chunk: Uint8Array | undefined): string {
      if (!chunk) return '';
      return decoder.decode(chunk, { stream: true });
      };
      }
      接下来就是发送网络请求到 /api/chat ,请求成功之后将数据渲染出来。
      export function useChat({
      api = '/api/chat',
      id,
      initialInput = '',
      initialMessages = [],
      }: UseChatOptions = {}): UseChatHelpers {
      // 省略其它代码...
      const { error, trigger, isMutating } = useSWRMutation<
      string | null,
      any,
      [string, string],
      Message[]
      >(
      [api, chatId],
      async (_, { arg: messagesSnapshot }) => {
      try {
      const abortController = new AbortController();
      abortControllerRef.current = abortController;
      // 保存上一次的消息列表,用于时光回溯
      const previousMessages = messagesRef.current;
      // 先更新UI
      mutate(messagesSnapshot, false);
      // 处理参数
      const body = messagesSnapshot.map(({ role, content }) => ({
      role,
      content,
      }));
      const res = await fetch(api, {
      method: 'POST',
      body: JSON.stringify({ messages: body }),
      signal: abortController.signal,
      }).catch((err) => {
      // 如果报错了,回退到上一次的消息列表
      mutate(previousMessages, false);
      throw err;
      });
      if (!res.ok) {
      // 如果接口请求不成功,回退到上一次的消息列表
      mutate(previousMessages, false);
      throw new Error(
      (await res.text()) || 'Faild to fetch the chat response.'
      );
      }
      // body为空,直接报错
      if (!res.body) {
      throw new Error('The response body is empty.');
      }
      let result = '';
      const createdAt = new Date();
      // 创建唯一的消息ID
      const replyId = nanoid();
      // 使用 reader 的方式读取ReadableStream
      const reader = res.body.getReader();
      // 使用 TextDecoder 进行将 Uint8Array 解码成 string
      const decode = createChunkDecoder();
      while (true) {
      const { done, value } = await reader.read();
      if (done) {
      break;
      }
      // 将二进制value 解析成字符串,然后进行拼接
      result += decode(value);
      // 及时更新UI
      mutate(
      [
      ...messagesSnapshot,
      {
      id: replyId,
      createdAt,
      content: result,
      role: 'assistant',
      },
      ],
      false
      );
      // 如果请求取消了,则需要暂停读取stream
      if (abortControllerRef.current === null) {
      reader.cancel();
      break;
      }
      }
      abortControllerRef.current = null;
      return result;
      } catch (err) {
      if ((err as any).name === 'AbortError') {
      abortControllerRef.current = null;
      return null;
      }
      throw err;
      }
      },
      {
      revalidate: false, // 不需要重新验证缓存
      }
      );
      const append = useCallback(
      async (message: Message | CreateMessage) => {
      if (!message.id) {
      message.id = nanoid();
      }
      // 将消息添加到消息列表,手动触发接口请求
      return trigger(messagesRef.current.concat(message as Message));
      },
      [trigger]
      );
      const handleSubmit = useCallback(
      (e: FormEvent<HTMLFormElement>) => {
      e.preventDefault();
      // 如果输入框没有内容,直接返回
      if (!input) {
      return;
      }
      // 将内容添加到消息列表中
      append({
      content: input,
      role: 'user',
      createdAt: new Date(),
      });
      setInput('');
      },
      [append, input]
      );
      return {
      error,
      append,
      handleSubmit,
      isLoading: isMutating,
      };
      }
      接下来就是将上面的 hook 逻辑和视图绑定在一起。
      // app/page.tsx
      const Chat = () => {
      const { messages, input, handleSubmit, handleInputChange } = useChat();
      return (
      <div className="flex h-full flex-col w-full max-w-xl pb-36 pt-9 mx-auto stretch">
      <ul className="space-y-4">
      {messages.map((message) => (
      <MessageCard key={message.id} message={message} />
      ))}
      </ul>
      <form onSubmit={handleSubmit}>
      <input
      value={input}
      onChange={handleInputChange}
      className="w-full p-3 focus-visible:outline-gray-300 rounded shadow-xl focus:shadow-2xl transition-all"
      placeholder="随便说点什么..."
      />
      </form>
      </div>
      );
      };
      export default Chat;

      连通性验证

      不出意外的报 bug 了。
      readablestream-error.png
      readablestream-error.png
      问题在于 fetch.body 方法返回的是一个二进制流(Uint8Array),fetch 提供了 text() json() blob() 等方式将二进制流转化为其他数据格式。
      刚刚在 libs/ai-stream.ts 里面推到队列里面的是字符串,所以就会报错。
      export function AIStream(): ReadableStream {
      const stream = new ReadableStream({
      start(controller) {
      controller.enqueue('hello\n'); // 字符串
      controller.enqueue('world\n'); // 字符串
      controller.close();
      },
      });
      return stream;
      }
      有两种解决方式。
      1. enqueue 推到队列的类型改成二进制形式:
      const textEncoder = new TextEncoder();
      const stream = new ReadableStream({
      start(controller) {
      controller.enqueue(textEncoder.encode('hello\n'));
      controller.enqueue(textEncoder.encode('world\n'));
      controller.close();
      },
      });
      创建了 TextEncoder 对象,用于将字符串编码为 Uint8Array 对象。
      1. 采用 TransformStream 可以对输入的数据进行转换处理:
      export function createCallbacksTransformer() {
      const encoder = new TextEncoder();
      return new TransformStream<string, Uint8Array>({
      async transform(message, controller): Promise<void> {
      controller.enqueue(encoder.encode(message));
      },
      });
      }
      export function AIStream(): ReadableStream {
      const stream = new ReadableStream({
      start(controller) {
      controller.enqueue('hello\n');
      controller.enqueue('world\n');
      controller.close();
      },
      });
      return stream.pipeThrough(createCallbacksTransformer());
      }
      通过使用 pipeThrough 方法,将 AIStream 的输出流连接到 createCallbacksTransformer 的输入流,实现了数据的转换和传递。
      实现将 string 转换为 Uint8Array 对象的流处理过程。
      本文后面会使用第二种方式。
      我们来看看效果:
      connect-api-route.gif
      connect-api-route.gif
      🎉🎉🎉 完成了一大步 🎉🎉🎉。

      连接 openai

      在连接 openai 之前,需要先安装 openai-edge。
      可以点击查看 openai-edge 具体的 api。
      pnpm add openai-edge
      接下来就是在 app/api/chat/route.ts 中连接 openai。
      // app/api/chat/route.ts
      const config = new Configuration({
      // 申请好的 OPENAI_API_KEY
      apiKey: process.env.OPENAI_API_KEY,
      });
      const openai = new OpenAIApi(config);
      export const runtime = 'edge';
      export async function POST(req: Request) {
      // 获得请求参数
      const { messages } = await req.json();
      const response = await openai.createChatCompletion({
      model: 'gpt-3.5-turbo',
      stream: true,
      messages: messages.map((message: any) => ({
      content: message.content,
      role: message.role,
      })),
      });
      // 将 response 传递给 AIStream 进行处理
      const stream = AIStream(response);
      return new StreamingTextResponse(stream);
      }
      这里 process.env.OPENAI_API_KEY 读取的是环境变量,我们可以新建一个 .env.local 的文件。
      OPENAI_API_KEY=api key
      新建完成之后需要重启一下服务,加载一下环境变量。
      使用.env.local 命名是为了防止将该文件提交到 git,如果上传到 github 会暴露 apiKey。
      在服务端请求 openai api,并将 openai 返回的 response 传递给了 AIStream,AIStream 也需要更新一下。
      // libs/ai-stream.ts
      export function AIStream(res: Response): ReadableStream {
      if (!res.ok) {
      throw new Error(
      `Failed to convert the response to stream. Received status code: ${res.status}.`
      );
      }
      const stream =
      res.body ||
      new ReadableStream({
      start(controller) {
      controller.close();
      },
      });
      return stream;
      // .pipeThrough(createCallbacksTransformer);
      }
      res.body 返回的是 ReadableStream<Uint8Array> ,之前写的 pipeThrough(createCallbacksTransformer) 是将 string -> Uint8Array,目前先用不到,先注释掉。
      再来看看效果:
      result-not-decode-demo.gif
      result-not-decode-demo.gif

      sse 数据

      从 gif 图上看到返回"data: json 字符串"这样子的格式,这是 sse 数据格式。
      sse 数据:每一次发送的信息,由若干个 message 组成,每个 message 之间用\n\n 分隔。
      每个 message 内部由若干行组成,每一行都是如下格式。
      [field]: value\n
      上面的 field 可以取四个值。
      data
      event
      id
      retry
      sse 数据格式可以看看 阮一峰老师的这篇文章 ,写的还是挺详细的。

      如何解析 sse 数据

      sse 数据如何进行解析呢? 可以使用 eventsource-parser 这个库,这个库就是用来解析 sse 数据的。
      pnpm add eventsource-parser
      fetch.body 返回的是 Uint8Array ,还是使用 TransformStream,在 TransformStream 中进行 sse 数据的解析。
      1. 创建 TransformStream 将 Uint8Array 格式先转成 string 格式,解析 sse 数据之后再进一步把 content 读取出来。
      2. 然后延用上面的 createCallbacksTransformer 将 string 格式再转成 Uint8Array 格式。
      先新建函数 createEventStreamTransformer
      // libs/ai-stream.ts
      import {
      createParser,
      type EventSourceParser,
      type ParseEvent,
      } from 'eventsource-parser';
      export function createEventStreamTransformer(customParser: AIStreamParser) {
      const decoder = new TextDecoder();
      let parser: EventSourceParser;
      return new TransformStream<Uint8Array, string>({
      async start(controller) {
      function onParse(event: ParseEvent) {
      if (event.type === 'event') {
      const data = event.data; // data 就是解析之后的每个数据
      if (data === '[DONE]') {
      // 如果是[DONE] 表示解析完毕。
      controller.terminate();
      return;
      }
      // 将 json 数据字符串传入到自定义解析器进行过滤
      const message = customParser(data);
      // 如果 message 有值的话,就推到队列中。
      if (message) controller.enqueue(message);
      }
      }
      // 创建解析器
      parser = createParser(onParse);
      },
      transform(chunk) {
      parser.feed(decoder.decode(chunk));
      },
      });
      }
      在 start 方法中,创建了 parser,用来解析数据流中的 sse 数据。
      在 transform 方法中,此时 chunk 是 Uint8Array 类型,先使用 TextDecoder 将 Uint8Array 解析成字符串,然后通过 feed 进行 sse 数据的解析,解析后的数据传递给 onParse 函数。
      这里为了逻辑解耦,使用了 customParser 自定义解析器,传入 customParser 来实现解析具体的逻辑(下面会介绍)。
      流可以使用多次 pipeThrough,进行多次转化。
      // libs/ai-stream.ts
      export function AIStream(res: Response): ReadableStream {
      // 省略其他代码...
      return stream
      .pipeThrough(createEventStreamTransformer())
      .pipeThrough(createCallbacksTransformer());
      }
      • 第一次 pipeThrough:将 Uint8Array -> string。
        • 具体的这一步流程是:Uint8Array -> eventsource-parser 解析 sse 数据 -> 走到 onParse 函数 -> customParser(sse 数据 json 字符串) -> string
      • 第二次 pipeThrough:将第一次 pipeThrough 的结果 string -> Uint8Array。
      pipe-through-stream.png
      pipe-through-stream.png
      AIStream 最终返回的依然是 Uint8Array 格式的流。

      自定义解析器

      createEventStreamTransformer 方法传入了自定义解析器,customParser 该怎么写呢?
      libs/ai-stream.ts 文件就只负责流的处理,将 customParser 作为参数传入即可,具体如何解析取决于上层应用。
      // libs/ai-stream.ts
      export function AIStream(
      res: Response,
      customParser: AIStreamParser // 新增
      ): ReadableStream {
      // 省略其他代码...
      return stream
      .pipeThrough(createEventStreamTransformer(customParser)) // 传入
      .pipeThrough(createCallbacksTransformer());
      }
      新建文件 libs/openai-stream.ts 实现 openai 流的 customParser。
      // libs/openai-stream.ts
      import { AIStream } from './ai-stream';
      export function trimStartOfStreamHelper() {
      let start = true;
      return (text: string) => {
      if (start) text = text.trimStart();
      if (text) start = false;
      return text;
      };
      }
      function parseOpenAIStream(): (data: string) => string | void {
      const trimStartOfStream = trimStartOfStreamHelper();
      return (data) => {
      const json = JSON.parse(data); // 将 json 字符串解析成对象
      const text = trimStartOfStream(
      json.choices[0]?.delta?.content ?? json.choices[0]?.text ?? ''
      ); // 读取对应的字段
      return text;
      };
      }
      export function OpenAIStream(res: Response): ReadableStream {
      return AIStream(res, parseOpenAIStream());
      }
      一个消息头部可能有多个空格,使用 trimStartOfStreamHelper 辅助函数把 chunk 最前面的空格给去掉。
      因为 data 传入的时候已经是 sse 的数据部分,是 json 字符串,可以使用 JSON.parse 的方式来解析成对象,最后在读取相对应的字段即可。
      再更新一下 app/api/chat/route.ts 文件,将 AIStream 替换成 OpenAIStream。
      // app/api/chat/route.ts
      export async function POST(req: Request) {
      // 省略其他代码...
      const stream = OpenAIStream(response); // AIStream() -> OpenAIStream()
      return new StreamingTextResponse(stream);
      }
      再来看看效果
      basic-demo.gif
      basic-demo.gif
      🎉🎉🎉 牛哇,实现啦!!! 🎉🎉🎉

      实现 regenerate

      如果对当前生成的结果不满意,重新生成新的结果。
      // hooks/use-chat.ts
      export type UseChatHelpers = {
      reload: () => Promise<string | null | undefined>;
      };
      export function useChat({
      api = '/api/chat',
      id,
      initialInput = '',
      initialMessages = [],
      }: UseChatOptions = {}): UseChatHelpers {
      // 省略其他代码...
      const reload = useCallback(async () => {
      if (messagesRef.current.length === 0) return null;
      const lastMessage = messagesRef.current[messagesRef.current.length - 1];
      // 如果最后一条消息是 chatgpt 生成的
      if (lastMessage.role === 'assistant') {
      // 去掉消息列表的最后一条消息,然后触发接口请求
      return trigger(messagesRef.current.slice(0, -1));
      }
      return trigger(messagesRef.current);
      }, [trigger]);
      return {
      // 省略其他的代码...
      reload,
      };
      }
      新增 react-feather 添加几个好看的图标,顺便也把 input 输入框美化一下。
      // app/page.tsx
      // 省略其他代码...
      import { Pause, Send, RotateCw, MoreHorizontal } from 'react-feather';
      import classNames from 'classnames';
      const Chat = () => {
      // 省略其他代码...
      const { reload, messages } = useChat();
      const disabledClassName = isLoading
      ? 'cursor-not-allowed pointer-events-none opacity-70'
      : '';
      return (
      // 省略其他代码...
      <div className="fixed w-full left-0 bottom-0 py-4 bg-gray-100 border-t border-t-gray-300">
      {messages.length > 0 ? (
      <button
      className="mb-2 mx-auto border border-gray-300 bg-gray-100 text-gray-600 p-2 px-8 rounded-md hover:bg-gray-200 transition-all flex items-center"
      onClick={reload}
      >
      <RotateCw className="mr-2" size={16} />
      重新生成
      </button>
      ) : null}
      <form
      onSubmit={handleSubmit}
      className="max-w-xl w-full mx-auto relative"
      >
      <div className="relative">
      <input
      disabled={isLoading}
      value={input}
      onChange={handleInputChange}
      className={classNames(
      'w-full p-3 focus-visible:outline-gray-300 border border-gray-300 bg-gray-100 rounded-md shadow-xl focus:shadow-2xl transition-all',
      disabledClassName
      )}
      placeholder="随便说点什么..."
      />
      {isLoading ? (
      <button
      className={classNames(
      'absolute right-3 bg-gray-200 p-1 top-1/2 -translate-y-1/2 rounded max-w-xs transition-all',
      disabledClassName
      )}
      >
      <MoreHorizontal size={16} />
      </button>
      ) : (
      <button className="absolute right-3 bg-gray-200 hover:text-white p-1 top-1/2 -translate-y-1/2 rounded max-w-xs hover:bg-green-400 transition-all">
      <Send size={16} />
      </button>
      )}
      </div>
      </form>
      </div>
      );
      };
      看看具体的效果:
      reload-demo.gif
      reload-demo.gif

      实现 stop

      hooks/use-chat.ts 中使用了 AbortController 传递给了 fetch 函数,写一个 stop 方法来实现暂停。
      // hooks/use-chat.ts
      export type UseChatHelpers = {
      stop: () => void;
      };
      export function useChat({
      api = '/api/chat',
      id,
      initialInput = '',
      initialMessages = [],
      }: UseChatOptions = {}): UseChatHelpers {
      // 省略其他代码...
      const stop = useCallback(() => {
      if (abortControllerRef.current) {
      // 取消请求
      abortControllerRef.current.abort();
      abortControllerRef.current = null;
      }
      }, []);
      return {
      // 省略其他的代码...
      stop,
      };
      }
      如果在生成中会出现暂停的图标,再更新一下 UI。
      // app/page.tsx
      import { Pause, Send } from 'react-feather';
      const Chat = () => {
      // 省略其它代码...
      const { stop, isLoading, messages } = useChat();
      const getBtnContent = () => {
      if (isLoading) {
      return (
      <>
      <Pause className="mr-2" size={16} />
      暂停生成
      </>
      );
      }
      return (
      <>
      <RotateCw className="mr-2" size={16} />
      重新生成
      </>
      );
      };
      return (
      // 省略其他代码...
      <div className="fixed w-full left-0 bottom-0 py-4 bg-gray-100 border-t border-t-gray-300">
      {messages.length > 0 ? (
      <button
      className="mb-2 mx-auto border border-gray-300 bg-gray-100 text-gray-600 p-2 px-8 rounded-md hover:bg-gray-200 transition-all flex items-center"
      onClick={isLoading ? stop : reload}
      >
      {getBtnContent()}
      </button>
      ) : null}
      <form
      onSubmit={handleSubmit}
      className="max-w-xl w-full mx-auto relative"
      >
      {/* 省略其他代码... */}
      </form>
      </div>
      );
      };
      来看看效果
      stop-demo.gif
      stop-demo.gif
      有一点小缺陷:点击 stop 只是前端不再读取网络流,所以只是前端的渲染暂停,但是此时网络流还是没有断,更好的方式应该是发送消息给后端,后端主动中断流,然后再停止前端的渲染。

      渲染 markdown

      openai 是支持写 markdown 格式的,可以引入 markdown -> html 的包进行渲染。
      需要安装 marked 和 @tailwindcss/typography。
      • marked:将 markdown 解析成 html。
      • @tailwindcss/typography:漂亮的文案排版。
      pnpm add marked
      pnpm add @tailwindcss/typography @types/marked -D
      新建 libs/marked.ts 文件,用来写 markdown -> HTML 的方法
      // libs/marked.ts
      import { marked } from 'marked';
      export const markdownToHTML = (markdown: string) => {
      if (!markdown || typeof markdown !== 'string') {
      return '';
      }
      return marked.parse(markdown);
      };
      @tailwindcss/typography 的配置也比较简单,在 tailwind.config.js 的 plugin 配置一下。
      // tailwind.config.js
      /** @type {import('tailwindcss').Config} */
      module.exports = {
      // 省略其他代码...
      plugins: [require('@tailwindcss/typography')],
      };
      app/MessageCard.tsx 文件中,给需要进行渲染 markdown html 的内容添加上类名:prose 。
      // app/MessageCard.tsx
      import { markdownToHTML } from '@/libs/marked';
      const MessageCard = ({ message }: MessageCardProps) => {
      const content = markdownToHTML(message.content);
      return (
      <div className="flex items-start">
      <Avatar role={message.role} />
      <div
      className="pl-2 leading-6 prose transition-all max-w-xl" // 在这里!
      dangerouslySetInnerHTML={{ __html: content }}
      ></div>
      </div>
      );
      };
      来看看效果:
      markdown-demo.gif
      markdown-demo.gif

      保持滚动条在底部

      如果渲染的消息过长,或者消息过多时,得手动进行滚动,写一个 hook 要滚动条一直维持在底部。
      新建 hook: hooks/use-scroll-bottom
      需要先安装 lodash.throttle
      pnpm add lodash.throttle
      // hooks/use-scroll-bottom.ts
      import throttle from 'lodash.throttle';
      import { RefObject, useEffect } from 'react';
      interface UseScrollBottomOptions {
      scrollRef: RefObject<HTMLElement>;
      }
      const useScrollBottom = ({ scrollRef }: UseScrollBottomOptions) => {
      useEffect(() => {
      const scrollingElement = scrollRef.current;
      const callback: MutationCallback = function (mutationsList) {
      for (let mutation of mutationsList) {
      if (mutation.type === 'childList') {
      window.scrollTo(0, document.body.scrollHeight);
      }
      }
      };
      const throttleCallback = throttle(callback, 1000 / 16);
      const observer = new MutationObserver(throttleCallback);
      if (scrollingElement) {
      observer.observe(scrollingElement!, {
      subtree: true,
      childList: true,
      });
      }
      return () => {
      observer.disconnect();
      };
      }, []);
      };
      export default useScrollBottom;
      app/page.tsx 中导入进行使用即可。
      // app/page.tsx
      const Chat = () => {
      const scrollRef = useRef<HTMLUListElement | null>(null);
      useScrollBottom({ scrollRef });
      return (
      <ul className="space-y-4" ref={scrollRef}>
      // 绑定ref
      {messages.map((message) => (
      <MessageCard key={message.id} message={message} />
      ))}
      </ul>
      );
      };