手摸手带你实现前端乞丐版的 ChatGPT
589
人阅读
8 分钟阅读
一开始看到 chatgpt 的流式渲染,有点好奇流式渲染是如何实现的,无意之间发现 vercel 的库:ai,仔细学习了它的代码,写的小而精,把它看明白了之后,想着写一篇文章来输出一下。 所以我就根据 ai 这个库,来一步步带大家来实现前端乞丐版 chatgpt ,把这篇文章看完之后,也可以去看看 ai 这个库,代码是写的真不错(会发现代码都是抄它的,哈哈哈哈)!!!
一开始看到 chatgpt 的流式渲染,有点好奇流式渲染是如何实现的,无意之间发现 vercel 的库:ai,仔细学习了它的代码,写的小而精,把它看明白了之后,想着写一篇文章来输出一下。 所以我就根据 ai 这个库,来一步步带大家来实现前端乞丐版 chatgpt ,把这篇文章看完之后,也可以去看看 ai 这个库,代码是写的真不错(会发现代码都是抄它的,哈哈哈哈)!!!
一开始看到 chatgpt 的流式渲染,有点好奇流式渲染是如何实现的,无意之间发现 vercel 的库:ai,仔细学习了它的代码,写的小而精,把它看明白了之后,想着写一篇文章来输出一下。
所以我就根据 ai 这个库,来一步步带大家来实现前端乞丐版 chatgpt ,把这篇文章看完之后,也可以去看看 ai 这个库,代码是写的真不错(会发现代码都是抄它的,哈哈哈哈)!!!
本文会使用 Next13,不熟悉也没关系,用到的 api 不多,不懂的 api 可以查看
Next13 文档
。
注意:需要申请有一个 openai 的
apiKey
,不然就无法调用接口哦。
效果展示:
什么是网络流
先看看一个概念:网络流,平时可能用不到。流是一种用于访问数据的数据结构,比如说:文件、接口返回的数据等等。
使用流有两个好处:
-
可以处理大量数据,流可以将它们分成更小的部分(chunk),可以一次处理一个(chunk)。
-
可以使用相同的数据结构、流,同时处理不同的数据,这使得代码变得更加复用。
在网络流中,一个 chunk 通常是:
-
文本流:string
-
二进制流:Uint8Arrays
网络流主要有三种:
-
ReadableStream:用于从数据源中读取数据。执行此操作的代码成为消费者。
-
WritableStream:用户将数据写入。执行此操作的代码成为生产者。
-
TransformStream 由两个流组成:
-
它从其可写端(WritableStream)接收输入。
-
它将输出发送到它的可读端,一个 ReadableStream。
本文中只会使用到 ReadableStream 和 TransformStream。ReadableStream
ReadableStream 可以从各种来源读取数据块,类型声明如下:interface ReadableStream<TChunk> {getReader(): ReadableStreamDefaultReader<TChunk>;pipeThrough<TChunk2>(transform: ReadableWritablePair<TChunk2, TChunk>,options?: StreamPipeOptions): ReadableStream<TChunk2>;readonly locked: boolean;// 忽略// [Symbol.asyncIterator](): AsyncIterator<TChunk>;// cancel(reason?: any): Promise<void>;// pipeTo(// destination: WritableStream<TChunk>,// options?: StreamPipeOptions// ): Promise<void>;// tee(): [ReadableStream<TChunk>, ReadableStream<TChunk>];}这三个属性的作用是:-
getReader()
:返回一个 Reader,可以从 ReadableStream 读取的对象,返回的 Reader 类似于迭代器。 -
locked
:每个 ReadableStream 一次只能有一个活动的 Reader,当 Reader 在使用时,ReadableStream 被锁定并且getReader()
不能被调用。 -
pipeThrough()
:将其 ReadableStream 连接到 ReadableWritablePair(一个 TransformStream)。它返回一个新的 ReadableStream(类比一下:把它理解成一个数组的 map 方法)。
下面来看看getReader()
的返回类型:interface ReadableStreamGenericReader {cancel(reason?: any): Promise<void>;// 忽略// readonly closed: Promise<undefined>;}interface ReadableStreamDefaultReader<TChunk>extends ReadableStreamGenericReader {releaseLock(): void;read(): Promise<ReadableStreamReadResult<TChunk>>;}interface ReadableStreamReadResult<TChunk> {done: boolean;value: TChunk | undefined;}-
cancel()
:在一个活动的 Reader 中,这个方法取消关联的 ReadableStream。 -
releaseLock()
:停用 Reader 并解锁流。 -
read()
返回来两个属性的 ReadableStreamReadResult 的 Promise:-
done
:布尔值,false 表示可以读取,true 表示最后一个块。 -
value
:块(chunk)。
师傅,别念 api 了,再念人都要傻了,赶紧来一个 demo 吧。以下是通过 getReader 方式来读取 ReadableStream 的小例子。const reader = readableStream.getReader(); // (A)console.log(readableStream.locked); // true (B)let result = '';try {while (true) {const { done, value } = await reader.read(); // (C)if (done) {break;}result += value; // (D)}} finally {reader.releaseLock(); // (E)}console.log('result', result);-
A 行:不能直接读取 readableStream,需要调用
getReader()
来获取 Reader。 -
B 行:在
getReader()
之后,readableStream 被锁定,所以 B 行打印的是 true,如果想再次调用getReader()
,必须调用releaseLock()
(E 行)。 -
C 行:
read()
返回属性 done 和 value,如果 done 为 true,表示是最后一个块, -
D 行:可以对返回的 value 进行操作,这里是将返回的 value 全部都加在一个字符串里。
通过包装将数据源转化为 ReadableStream
如果想通过 ReadableStream 读取外部源,可以将其包装在适配器对象中并将该对象传递给构造函数 ReadableStream。new ReadableStream(underlyingSource?, queuingStrategy?)以下是类型声明:interface UnderlyingSource<TChunk> {start?(controller: ReadableStreamController<TChunk>): void | Promise<void>;// 忽略// pull?(controller: ReadableStreamController<TChunk>): void | Promise<void>;// cancel?(reason?: any): void | Promise<void>;// type: 'bytes' | undefined;// autoAllocateChunkSize: bigint;}-
start(controller)
调用构造函数后立即调用 start 方法。
controller 的参数类型如下:type ReadableStreamController<TChunk> =| ReadableStreamDefaultController<TChunk>| ReadableByteStreamController<TChunk>; // 先忽略interface ReadableStreamDefaultController<TChunk> {enqueue(chunk?: TChunk): void;close(): void;// 忽略// readonly desiredSize: number | Null;// error(err?: any): void;}-
enqueue(chunk)
:添加 chunk 到 ReadableStream 的内部队列。 -
close()
:关闭 ReadableStream,消费者仍然可以清空队列,在那之后,流结束。
自定义 ReadableStream demo
async function test14() {const readableStream = new ReadableStream({start(controller) {controller.enqueue('First Line\n'); // (A)controller.enqueue('Second Line\n'); // (B)controller.close(); // (C)},});for await (const chunk of readableStream) {console.log(chunk);}}test14();// First Line// Second LineReadableStream 是异步可迭代的,可以使用for-await-of
来进行迭代。使用控制器创建一个包含两个块的流(A 和 B 行),关闭流(C 行)很重要,否则for-await-of
永远不会结束。TransformStream
-
通过其可写端(WritableStream)接受输入。
-
然后它可能会或可能不会转换输入。
-
结果可以通过 ReadableStream 读取,它是可读的。
使用TransformStream
最常见的方式是pipeThrough
。const transformStream = readableStream.pipeThrough(transformStream);.pipeThrough()
将 readableStream 传输到 transformStream 的可写端,并进行转换返回其可读端。换句话说: 创建了一个新的 ReadableStream,它是 ReadableStream 的转换版本,类似于数组的 map。一个简单的 demo:async function test21() {const encoder = new TextEncoder();const readableByteStream = new ReadableStream({start(controller) {controller.enqueue(encoder.encode('hello\n'));controller.enqueue(encoder.encode('world\n'));},});const readableStream = readableByteStream.pipeThrough(new TextDecoderStream('utf-8'));for await (const stringChunk of readableStream) {console.log(stringChunk);}}test21();TextEncoder.encode():将字符串作为输入,并返回 Uint8Array 包含 UTF-8 编码的文本。使用了内置 TransformStream:TextDecoderStream()
,作用就是将接收到的二进制流转换为可读的文本流(Uint8Array -> string)。自定义 TransformStream
跟上面的 ReadableStream 类似,如果要自定义 TransformStream,也可以传递适配器对象给构造函数 TransformStream。它具有以下类型:interface TransformStream<InChunk, OutChunk> {start?(controller: TransformStreamDefaultController<OutChunk>): void | Promise<void>;transform?(chunk: InChunk,controller: TransformStreamDefaultController<OutChunk>): void | Promise<void>;// 忽略// flush?(// chunk: InChunk,// controller: TransformStreamDefaultController<OutChunk>// ): void | Promise<void>;}上面属性的解释:-
start(controller)
:在调用构造函数后立即被调用,可以在转换之前做一些准备。 -
transform(chunk, controller)
执行实际的转化 。接受一个输入块,并可以使用 controller 将一个或多个转换后的输出块排队。
该 contrller 具有以下类型:interface TransformStreamDefaultController<OutChunk> {enqueue(chunk?: OutChunk): void;terminate(): void;// 忽略// readonly desiredSize: number | null;// error(err?: any): void;}-
enqueue(chunk)
:添加 chunk 到 TransformStream 的可读端(输出)。 -
terminate()
:关闭 TransformStream 的可读端(输出)并在可写端(输入)出错。如果转换器对可写端(输入)的剩余块不感兴趣并想跳过他们,则可以使用它。
小 demo
说了一大堆 api,来一个简单的例子:async function test20() {// 创建一个 ReadableStream 对象const readableStream = new ReadableStream({start(controller) {controller.enqueue('hello');controller.enqueue('world');controller.close();},});// 创建一个 TransformStream 对象const transformer = new TransformStream({transform(chunk, controller) {// 对输入数据进行转换处理const transformedChunk = chunk.toUpperCase();// 将转换后的数据通过 controller.enqueue() 方法推送到输出流 ReadableStreamcontroller.enqueue(transformedChunk);},});// 通过 TransformStream 进行处理流的转换const newReadableStream = readableStream.pipeThrough(transformer);// 使用 reader 方式来读取 ReadableStreamconst reader = newReadableStream.getReader();while (true) {const { done, value } = await reader.read();if (done) {break;}console.log(value); // HELLO WORLD}}test20();重新回顾 fetch API
Fetch API 是一种用于获取和发送网络资源的现代 Web API。它提供了一种替代 XMLHttpRequest 的方式,可以更简单、更灵活地进行网络请求。Fetch API 使用 Promise 对象来返回请求结果,可以轻松地将其与 async/await 结合使用。简单的小例子:async function main() {const res = await fetch('https://example.com');const data = await res.json();console.log('data', data);}main();这里要用的是 fetch 返回的属性body
,它返回的是ReadableStream
。async function main() {const res = await fetch('https://example.com');const body = res.body; // ReadableStreamconst reader = body.getReader();while (true) {const { done, value } = await reader.read();if (done) {break;}console.log('value', value);}}main();用到了上面提到的 ReadableStream,也算是简单的回顾一下。需要注意的是:fetch.body
返回的是二进制流,后面会再提到 。讲到这里,终于把前置的知识熟悉一下,我知道你很急,但是你先别急。我们来进入实战环节。初始化项目
首先使用pnpm create next-app
初始化一个 Next13 项目。-
What is your project named? openai-stream
-
Would you like to use TypeScript with this project? Yes
-
Would you like to use ESLint with this project? Yes
-
Would you like to use Tailwind CSS with this project? Yes
-
Would you like to use
src/
directory with this project? No -
Use App Router (recommended)? Yes
-
Would you like to customize the default import alias? No
初始化之后就会安装 TypeScript、Eslint 和 Tailwind CSS。当个 UI 仔也挺好
第一步就开始画 UI。UI 的话主要有两个部分:-
消息列表展示。
-
input 输入框。
MessageCard 渲染信息
创建类型文件type.ts
定义关于 message 的类型。// types.tsexport type Message = {id: string;createdAt?: Date;content: string;role: 'system' | 'user' | 'assistant';};MessageCard 组件用来渲染输入和 openai 返回的信息。import { Message } from '@/types';import classNames from 'classnames';interface MessageCardProps {message: Message;}type AvatarProps = Pick<Message, 'role'>;const Avatar = ({ role }: AvatarProps) => {const getName = () => (role === 'user' ? 'U' : 'AI');return (<spanclassName={classNames('w-6 h-6 inline-flex items-center justify-center rounded-full min-w-[24px]',role === 'user' ? 'bg-orange-300' : 'bg-green-400')}>{getName()}</span>);};const MessageCard = ({ message }: MessageCardProps) => {return (<div key={message.id} className="flex items-center"><Avatar role={message.role} /><div className="ml-2">{message.content}</div></div>);};export default MessageCard;基础页面 + input 输入框
下一步画基础的页面和 input 输入框。直接在app/page.tsx
里面书写即可。// app/page.tsx'use client';import MessageCard from './MessageCard';import { Message } from '@/types';const Chat = () => {const messages: Message[] = [{ id: '1', content: 'hello', role: 'user' },{ id: '2', content: 'world', role: 'assistant' },];return (<div className="flex h-full flex-col w-full max-w-xl pb-36 pt-9 mx-auto stretch"><ul className="space-y-4">{messages.map((message) => (<MessageCard key={message.id} message={message} />))}</ul><formonSubmit={(e) => {e.preventDefault();}}><inputclassName="w-full p-3 focus-visible:outline-gray-300 border border-gray-300 rounded-md shadow-xl focus:shadow-2xl transition-all"placeholder="随便说点什么..."/></form></div>);};export default Chat;先 mock 消息列表,看看展示效果咋样。在 app 目录下新建文件app/api/chat/route.ts
,用来处理 api 请求。// app/api/chat/route.ts// GET 是书写规范!!!export async function GET() {const data = 'hello world';return NextResponse.json({ data });}可以直接访问 http://localhost:3000/api/chat 可以看到返回的数据。关于 Next.js 的 Route 可以查看 相关文档 ,不过多赘述。需要发送 POST 请求将 message 传递给 openai,通过export function POST
就可以处理 POST 请求。// app/api/chat/route.tsexport const runtime = 'edge'; // 如果要流式渲染则需要加上这一行export async function POST(req: Request) {const stream = AIStream();console.log('stream:', stream);return new StreamingTextResponse(stream);}libs/streaming-text-response.ts
就是对于 Response 的简单封装,将状态码置为 200。// libs/streaming-text-response.tsexport class StreamingTextResponse extends Response {constructor(res: ReadableStream, init?: ResponseInit) {super(res as any, {...init,status: 200,headers: {'Content-Type': 'text/plain; charset=utf-8',...init?.headers,},});}}创建文件libs/ai-stream.ts
用来处理网络流,使用 ReadableStream 先 mock 两条数据。// libs/ai-stream.tsexport function AIStream(): ReadableStream {const stream = new ReadableStream({start(controller) {controller.enqueue('hello\n');controller.enqueue('world\n');controller.close();},});return stream;}hook: use-chat
下一步写一个 hook 来进行页面交互,创建文件hooks/use-chat.ts
。需要安装一些依赖包:nanoid 和 swr。-
nanoid 是可以生成唯一 ID 的库。
-
swr 是用于数据请求和缓存的库。
pnpm add nanoid swr将 CreateMessage 和 UseChatOptions 添加到types.ts
文件,定义好 use-chat 的类型声明。// types.ts// 省略其他代码...export type CreateMessage = {id?: string;createdAt?: Date;content: string;role: 'system' | 'user' | 'assistant';};// type.tsexport type UseChatOptions = {// 指定聊天功能的API地址,默认为'/api/chat'api?: string;// 指定聊天的唯一标识符,如果没有指定,则使用useId生成唯一的Hook IDid?: string;// 消息列表初始化内容,默认为空数组initialMessages?: Message[];// 输入框的初始内容,默认为空字符串initialInput?: string;};// hooks/use-chat.tsexport type UseChatHelpers = {// 当前消息列表messages: Message[];// 请求过程中可能出现的错误error: undefined | Error;// 向聊天中追加一条消息append: (message: Message | CreateMessage) => Promise<string | null | undefined>;// 设置聊天消息列表setMessages: (messages: Message[]) => void;// 输入框内容input: string;// 设置输入框内容setInput: Dispatch<SetStateAction<string>>;// 处理输入框内容变化handleInputChange: (e: ChangeEvent<HTMLInputElement> | ChangeEvent<HTMLTextAreaElement>) => void;// 处理表单提交handleSubmit: (e: FormEvent<HTMLFormElement>) => void;// 请求是否正在加载中isLoading: boolean;};export function useChat(options?: UseChatOptions): UseChatHelpers;use-chat 具体逻辑
下面来写对应的 use-chat 具体逻辑:export function useChat({api = '/api/chat',id,initialInput = '',initialMessages = [],}: UseChatOptions = {}): UseChatHelpers {const hookId = useId();// 生成一个chatIdconst chatId = id || hookId;const { data, mutate } = useSWR<Message[]>([api, chatId], null, {fallbackData: initialMessages,});const messages = data!;// 用 ref 保存最新的消息列表const messagesRef = useRef<Message[]>(messages);useEffect(() => {messagesRef.current = messages;}, [messages]);// 处理input输入const [input, setInput] = useState(initialInput);const handleInputChange = useCallback((e: ChangeEvent<HTMLInputElement> | ChangeEvent<HTMLTextAreaElement>) => {setInput(e.target.value);},[]);// 设置消息列表const setMessages = useCallback((messages: Message[]) => {mutate(messages, false);messagesRef.current = messages;},[mutate]);return {messages,setMessages,input,setInput,handleInputChange,};}使用useSWR
来声明一个状态,第二个参数(fetcher)传入 null,表示不需要进行网络请求,可以把它当成本地状态来处理,返回的 mutate 函数可以对这个状态进行更新。小小的工具函数
新建utils.ts
文件用来保存两个工具函数: nanoid、createChunkDecoder。-
nanoid:使用 customAlphabet 函数创建了自定义的 ID 生成器 nanoid,用于生成唯一 ID。
-
createChunkDecoder:用于将 Uint8Array 类型的数据块解码成字符串(Uint8Array -> string)。
// utils.tsimport { customAlphabet } from 'nanoid';export const nanoid = customAlphabet('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz',7);export function createChunkDecoder() {const decoder = new TextDecoder();return function (chunk: Uint8Array | undefined): string {if (!chunk) return '';return decoder.decode(chunk, { stream: true });};}接下来就是发送网络请求到/api/chat
,请求成功之后将数据渲染出来。export function useChat({api = '/api/chat',id,initialInput = '',initialMessages = [],}: UseChatOptions = {}): UseChatHelpers {// 省略其它代码...const { error, trigger, isMutating } = useSWRMutation<string | null,any,[string, string],Message[]>([api, chatId],async (_, { arg: messagesSnapshot }) => {try {const abortController = new AbortController();abortControllerRef.current = abortController;// 保存上一次的消息列表,用于时光回溯const previousMessages = messagesRef.current;// 先更新UImutate(messagesSnapshot, false);// 处理参数const body = messagesSnapshot.map(({ role, content }) => ({role,content,}));const res = await fetch(api, {method: 'POST',body: JSON.stringify({ messages: body }),signal: abortController.signal,}).catch((err) => {// 如果报错了,回退到上一次的消息列表mutate(previousMessages, false);throw err;});if (!res.ok) {// 如果接口请求不成功,回退到上一次的消息列表mutate(previousMessages, false);throw new Error((await res.text()) || 'Faild to fetch the chat response.');}// body为空,直接报错if (!res.body) {throw new Error('The response body is empty.');}let result = '';const createdAt = new Date();// 创建唯一的消息IDconst replyId = nanoid();// 使用 reader 的方式读取ReadableStreamconst reader = res.body.getReader();// 使用 TextDecoder 进行将 Uint8Array 解码成 stringconst decode = createChunkDecoder();while (true) {const { done, value } = await reader.read();if (done) {break;}// 将二进制value 解析成字符串,然后进行拼接result += decode(value);// 及时更新UImutate([...messagesSnapshot,{id: replyId,createdAt,content: result,role: 'assistant',},],false);// 如果请求取消了,则需要暂停读取streamif (abortControllerRef.current === null) {reader.cancel();break;}}abortControllerRef.current = null;return result;} catch (err) {if ((err as any).name === 'AbortError') {abortControllerRef.current = null;return null;}throw err;}},{revalidate: false, // 不需要重新验证缓存});const append = useCallback(async (message: Message | CreateMessage) => {if (!message.id) {message.id = nanoid();}// 将消息添加到消息列表,手动触发接口请求return trigger(messagesRef.current.concat(message as Message));},[trigger]);const handleSubmit = useCallback((e: FormEvent<HTMLFormElement>) => {e.preventDefault();// 如果输入框没有内容,直接返回if (!input) {return;}// 将内容添加到消息列表中append({content: input,role: 'user',createdAt: new Date(),});setInput('');},[append, input]);return {error,append,handleSubmit,isLoading: isMutating,};}接下来就是将上面的 hook 逻辑和视图绑定在一起。// app/page.tsxconst Chat = () => {const { messages, input, handleSubmit, handleInputChange } = useChat();return (<div className="flex h-full flex-col w-full max-w-xl pb-36 pt-9 mx-auto stretch"><ul className="space-y-4">{messages.map((message) => (<MessageCard key={message.id} message={message} />))}</ul><form onSubmit={handleSubmit}><inputvalue={input}onChange={handleInputChange}className="w-full p-3 focus-visible:outline-gray-300 rounded shadow-xl focus:shadow-2xl transition-all"placeholder="随便说点什么..."/></form></div>);};export default Chat;连通性验证
不出意外的报 bug 了。问题在于fetch.body
方法返回的是一个二进制流(Uint8Array),fetch 提供了text()
、json()
、blob()
等方式将二进制流转化为其他数据格式。刚刚在libs/ai-stream.ts
里面推到队列里面的是字符串,所以就会报错。export function AIStream(): ReadableStream {const stream = new ReadableStream({start(controller) {controller.enqueue('hello\n'); // 字符串controller.enqueue('world\n'); // 字符串controller.close();},});return stream;}有两种解决方式。-
enqueue 推到队列的类型改成二进制形式:
const textEncoder = new TextEncoder();const stream = new ReadableStream({start(controller) {controller.enqueue(textEncoder.encode('hello\n'));controller.enqueue(textEncoder.encode('world\n'));controller.close();},});创建了 TextEncoder 对象,用于将字符串编码为 Uint8Array 对象。-
采用 TransformStream 可以对输入的数据进行转换处理:
export function createCallbacksTransformer() {const encoder = new TextEncoder();return new TransformStream<string, Uint8Array>({async transform(message, controller): Promise<void> {controller.enqueue(encoder.encode(message));},});}export function AIStream(): ReadableStream {const stream = new ReadableStream({start(controller) {controller.enqueue('hello\n');controller.enqueue('world\n');controller.close();},});return stream.pipeThrough(createCallbacksTransformer());}通过使用 pipeThrough 方法,将 AIStream 的输出流连接到 createCallbacksTransformer 的输入流,实现了数据的转换和传递。实现将 string 转换为 Uint8Array 对象的流处理过程。本文后面会使用第二种方式。我们来看看效果:🎉🎉🎉 完成了一大步 🎉🎉🎉。连接 openai
在连接 openai 之前,需要先安装 openai-edge。可以点击查看 openai-edge 具体的 api。pnpm add openai-edge接下来就是在app/api/chat/route.ts
中连接 openai。// app/api/chat/route.tsconst config = new Configuration({// 申请好的 OPENAI_API_KEYapiKey: process.env.OPENAI_API_KEY,});const openai = new OpenAIApi(config);export const runtime = 'edge';export async function POST(req: Request) {// 获得请求参数const { messages } = await req.json();const response = await openai.createChatCompletion({model: 'gpt-3.5-turbo',stream: true,messages: messages.map((message: any) => ({content: message.content,role: message.role,})),});// 将 response 传递给 AIStream 进行处理const stream = AIStream(response);return new StreamingTextResponse(stream);}这里process.env.OPENAI_API_KEY
读取的是环境变量,我们可以新建一个.env.local
的文件。OPENAI_API_KEY=api key新建完成之后需要重启一下服务,加载一下环境变量。使用.env.local 命名是为了防止将该文件提交到 git,如果上传到 github 会暴露 apiKey。在服务端请求 openai api,并将 openai 返回的 response 传递给了 AIStream,AIStream 也需要更新一下。// libs/ai-stream.tsexport function AIStream(res: Response): ReadableStream {if (!res.ok) {throw new Error(`Failed to convert the response to stream. Received status code: ${res.status}.`);}const stream =res.body ||new ReadableStream({start(controller) {controller.close();},});return stream;// .pipeThrough(createCallbacksTransformer);}res.body
返回的是ReadableStream<Uint8Array>
,之前写的pipeThrough(createCallbacksTransformer)
是将 string -> Uint8Array,目前先用不到,先注释掉。再来看看效果:sse 数据
从 gif 图上看到返回"data: json 字符串"这样子的格式,这是 sse 数据格式。sse 数据:每一次发送的信息,由若干个 message 组成,每个 message 之间用\n\n 分隔。每个 message 内部由若干行组成,每一行都是如下格式。[field]: value\n上面的 field 可以取四个值。dataeventidretrysse 数据格式可以看看 阮一峰老师的这篇文章 ,写的还是挺详细的。如何解析 sse 数据
sse 数据如何进行解析呢? 可以使用eventsource-parser
这个库,这个库就是用来解析 sse 数据的。pnpm add eventsource-parserfetch.body 返回的是 Uint8Array ,还是使用 TransformStream,在 TransformStream 中进行 sse 数据的解析。-
创建 TransformStream 将 Uint8Array 格式先转成 string 格式,解析 sse 数据之后再进一步把 content 读取出来。
-
然后延用上面的
createCallbacksTransformer
将 string 格式再转成 Uint8Array 格式。
先新建函数createEventStreamTransformer
:// libs/ai-stream.tsimport {createParser,type EventSourceParser,type ParseEvent,} from 'eventsource-parser';export function createEventStreamTransformer(customParser: AIStreamParser) {const decoder = new TextDecoder();let parser: EventSourceParser;return new TransformStream<Uint8Array, string>({async start(controller) {function onParse(event: ParseEvent) {if (event.type === 'event') {const data = event.data; // data 就是解析之后的每个数据if (data === '[DONE]') {// 如果是[DONE] 表示解析完毕。controller.terminate();return;}// 将 json 数据字符串传入到自定义解析器进行过滤const message = customParser(data);// 如果 message 有值的话,就推到队列中。if (message) controller.enqueue(message);}}// 创建解析器parser = createParser(onParse);},transform(chunk) {parser.feed(decoder.decode(chunk));},});}在 start 方法中,创建了 parser,用来解析数据流中的 sse 数据。在 transform 方法中,此时 chunk 是 Uint8Array 类型,先使用 TextDecoder 将 Uint8Array 解析成字符串,然后通过 feed 进行 sse 数据的解析,解析后的数据传递给 onParse 函数。这里为了逻辑解耦,使用了 customParser 自定义解析器,传入 customParser 来实现解析具体的逻辑(下面会介绍)。流可以使用多次 pipeThrough,进行多次转化。// libs/ai-stream.tsexport function AIStream(res: Response): ReadableStream {// 省略其他代码...return stream.pipeThrough(createEventStreamTransformer()).pipeThrough(createCallbacksTransformer());}-
第一次 pipeThrough:将 Uint8Array -> string。
-
具体的这一步流程是:Uint8Array -> eventsource-parser 解析 sse 数据 -> 走到 onParse 函数 -> customParser(sse 数据 json 字符串) -> string
-
-
第二次 pipeThrough:将第一次 pipeThrough 的结果 string -> Uint8Array。
AIStream 最终返回的依然是 Uint8Array 格式的流。自定义解析器
createEventStreamTransformer
方法传入了自定义解析器,customParser 该怎么写呢?libs/ai-stream.ts
文件就只负责流的处理,将 customParser 作为参数传入即可,具体如何解析取决于上层应用。// libs/ai-stream.tsexport function AIStream(res: Response,customParser: AIStreamParser // 新增): ReadableStream {// 省略其他代码...return stream.pipeThrough(createEventStreamTransformer(customParser)) // 传入.pipeThrough(createCallbacksTransformer());}新建文件libs/openai-stream.ts
实现 openai 流的 customParser。// libs/openai-stream.tsimport { AIStream } from './ai-stream';export function trimStartOfStreamHelper() {let start = true;return (text: string) => {if (start) text = text.trimStart();if (text) start = false;return text;};}function parseOpenAIStream(): (data: string) => string | void {const trimStartOfStream = trimStartOfStreamHelper();return (data) => {const json = JSON.parse(data); // 将 json 字符串解析成对象const text = trimStartOfStream(json.choices[0]?.delta?.content ?? json.choices[0]?.text ?? ''); // 读取对应的字段return text;};}export function OpenAIStream(res: Response): ReadableStream {return AIStream(res, parseOpenAIStream());}一个消息头部可能有多个空格,使用trimStartOfStreamHelper
辅助函数把 chunk 最前面的空格给去掉。因为 data 传入的时候已经是 sse 的数据部分,是 json 字符串,可以使用JSON.parse
的方式来解析成对象,最后在读取相对应的字段即可。再更新一下app/api/chat/route.ts
文件,将 AIStream 替换成 OpenAIStream。// app/api/chat/route.tsexport async function POST(req: Request) {// 省略其他代码...const stream = OpenAIStream(response); // AIStream() -> OpenAIStream()return new StreamingTextResponse(stream);}再来看看效果🎉🎉🎉 牛哇,实现啦!!! 🎉🎉🎉实现 regenerate
如果对当前生成的结果不满意,重新生成新的结果。// hooks/use-chat.tsexport type UseChatHelpers = {reload: () => Promise<string | null | undefined>;};export function useChat({api = '/api/chat',id,initialInput = '',initialMessages = [],}: UseChatOptions = {}): UseChatHelpers {// 省略其他代码...const reload = useCallback(async () => {if (messagesRef.current.length === 0) return null;const lastMessage = messagesRef.current[messagesRef.current.length - 1];// 如果最后一条消息是 chatgpt 生成的if (lastMessage.role === 'assistant') {// 去掉消息列表的最后一条消息,然后触发接口请求return trigger(messagesRef.current.slice(0, -1));}return trigger(messagesRef.current);}, [trigger]);return {// 省略其他的代码...reload,};}新增 react-feather 添加几个好看的图标,顺便也把 input 输入框美化一下。// app/page.tsx// 省略其他代码...import { Pause, Send, RotateCw, MoreHorizontal } from 'react-feather';import classNames from 'classnames';const Chat = () => {// 省略其他代码...const { reload, messages } = useChat();const disabledClassName = isLoading? 'cursor-not-allowed pointer-events-none opacity-70': '';return (// 省略其他代码...<div className="fixed w-full left-0 bottom-0 py-4 bg-gray-100 border-t border-t-gray-300">{messages.length > 0 ? (<buttonclassName="mb-2 mx-auto border border-gray-300 bg-gray-100 text-gray-600 p-2 px-8 rounded-md hover:bg-gray-200 transition-all flex items-center"onClick={reload}><RotateCw className="mr-2" size={16} />重新生成</button>) : null}<formonSubmit={handleSubmit}className="max-w-xl w-full mx-auto relative"><div className="relative"><inputdisabled={isLoading}value={input}onChange={handleInputChange}className={classNames('w-full p-3 focus-visible:outline-gray-300 border border-gray-300 bg-gray-100 rounded-md shadow-xl focus:shadow-2xl transition-all',disabledClassName)}placeholder="随便说点什么..."/>{isLoading ? (<buttonclassName={classNames('absolute right-3 bg-gray-200 p-1 top-1/2 -translate-y-1/2 rounded max-w-xs transition-all',disabledClassName)}><MoreHorizontal size={16} /></button>) : (<button className="absolute right-3 bg-gray-200 hover:text-white p-1 top-1/2 -translate-y-1/2 rounded max-w-xs hover:bg-green-400 transition-all"><Send size={16} /></button>)}</div></form></div>);};看看具体的效果:实现 stop
在hooks/use-chat.ts
中使用了 AbortController 传递给了 fetch 函数,写一个 stop 方法来实现暂停。// hooks/use-chat.tsexport type UseChatHelpers = {stop: () => void;};export function useChat({api = '/api/chat',id,initialInput = '',initialMessages = [],}: UseChatOptions = {}): UseChatHelpers {// 省略其他代码...const stop = useCallback(() => {if (abortControllerRef.current) {// 取消请求abortControllerRef.current.abort();abortControllerRef.current = null;}}, []);return {// 省略其他的代码...stop,};}如果在生成中会出现暂停的图标,再更新一下 UI。// app/page.tsximport { Pause, Send } from 'react-feather';const Chat = () => {// 省略其它代码...const { stop, isLoading, messages } = useChat();const getBtnContent = () => {if (isLoading) {return (<><Pause className="mr-2" size={16} />暂停生成</>);}return (<><RotateCw className="mr-2" size={16} />重新生成</>);};return (// 省略其他代码...<div className="fixed w-full left-0 bottom-0 py-4 bg-gray-100 border-t border-t-gray-300">{messages.length > 0 ? (<buttonclassName="mb-2 mx-auto border border-gray-300 bg-gray-100 text-gray-600 p-2 px-8 rounded-md hover:bg-gray-200 transition-all flex items-center"onClick={isLoading ? stop : reload}>{getBtnContent()}</button>) : null}<formonSubmit={handleSubmit}className="max-w-xl w-full mx-auto relative">{/* 省略其他代码... */}</form></div>);};来看看效果有一点小缺陷:点击 stop 只是前端不再读取网络流,所以只是前端的渲染暂停,但是此时网络流还是没有断,更好的方式应该是发送消息给后端,后端主动中断流,然后再停止前端的渲染。渲染 markdown
openai 是支持写 markdown 格式的,可以引入 markdown -> html 的包进行渲染。需要安装 marked 和 @tailwindcss/typography。-
marked:将 markdown 解析成 html。
-
@tailwindcss/typography:漂亮的文案排版。
pnpm add markedpnpm add @tailwindcss/typography @types/marked -D新建libs/marked.ts
文件,用来写 markdown -> HTML 的方法// libs/marked.tsimport { marked } from 'marked';export const markdownToHTML = (markdown: string) => {if (!markdown || typeof markdown !== 'string') {return '';}return marked.parse(markdown);};@tailwindcss/typography
的配置也比较简单,在tailwind.config.js
的 plugin 配置一下。// tailwind.config.js/** @type {import('tailwindcss').Config} */module.exports = {// 省略其他代码...plugins: [require('@tailwindcss/typography')],};在app/MessageCard.tsx
文件中,给需要进行渲染 markdown html 的内容添加上类名:prose 。// app/MessageCard.tsximport { markdownToHTML } from '@/libs/marked';const MessageCard = ({ message }: MessageCardProps) => {const content = markdownToHTML(message.content);return (<div className="flex items-start"><Avatar role={message.role} /><divclassName="pl-2 leading-6 prose transition-all max-w-xl" // 在这里!dangerouslySetInnerHTML={{ __html: content }}></div></div>);};来看看效果:保持滚动条在底部
如果渲染的消息过长,或者消息过多时,得手动进行滚动,写一个 hook 要滚动条一直维持在底部。新建 hook:hooks/use-scroll-bottom
。需要先安装 lodash.throttlepnpm add lodash.throttle// hooks/use-scroll-bottom.tsimport throttle from 'lodash.throttle';import { RefObject, useEffect } from 'react';interface UseScrollBottomOptions {scrollRef: RefObject<HTMLElement>;}const useScrollBottom = ({ scrollRef }: UseScrollBottomOptions) => {useEffect(() => {const scrollingElement = scrollRef.current;const callback: MutationCallback = function (mutationsList) {for (let mutation of mutationsList) {if (mutation.type === 'childList') {window.scrollTo(0, document.body.scrollHeight);}}};const throttleCallback = throttle(callback, 1000 / 16);const observer = new MutationObserver(throttleCallback);if (scrollingElement) {observer.observe(scrollingElement!, {subtree: true,childList: true,});}return () => {observer.disconnect();};}, []);};export default useScrollBottom;在app/page.tsx
中导入进行使用即可。// app/page.tsxconst Chat = () => {const scrollRef = useRef<HTMLUListElement | null>(null);useScrollBottom({ scrollRef });return (<ul className="space-y-4" ref={scrollRef}>// 绑定ref{messages.map((message) => (<MessageCard key={message.id} message={message} />))}</ul>);}; -
-