添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接
序语程言

1
pip install fastapi
pip install uvicorn

1
from fastapi import FastAPI, Request  
app = FastAPI()  
@app.get("/")  
async def root():  
    return {"message": "Hello World"}

 1
from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse  
import asyncio  
app = FastAPI()  
@app.get("/")  
async def root(request: Request):  
    async def event_generator(request: Request):  
        res_str = "七夕情人节即将来临,我们为您准备了精美的鲜花和美味的蛋糕"  
        for i in res_str:  
            if await request.is_disconnected():  
                print("连接已中断")  
                break  
            yield {  
                "event": "message",  
                "retry": 15000,  
                "data": i  
            await asyncio.sleep(0.1)  
    g = event_generator(request)  
    return EventSourceResponse(g)

 1
@app.get("/")  
async def root(request: Request):  
    async def event_generator(request: Request):  
        res_str = "七夕情人节即将来临,我们为您准备了精美的鲜花和美味的蛋糕"  
        for i in res_str:  
            if await request.is_disconnected():  
                print("连接已中断")  
                break  
            data = f'"event": "message"\n"retry": 15000\n"data":{i}\n\n'  
            yield data  
            await asyncio.sleep(0.1)  
    g = event_generator(request)  
    return StreamingResponse(g, media_type="text/event-stream")

1
event: message\r\ndata: \xe4\xb8\x83\r\nretry: 15000\r\n\r\n
event: message\r\ndata: \xe5\xa4\x95\r\nretry: 15000\r\n\r\n

 1
import requests  
def test():  
    url = r"http://127.0.0.1:8000/"  
    headers = {'Content-Type': 'text/event-stream'}  
    response = requests.get(url, headers=headers, stream=True)  
    for chunk in response.iter_content(chunk_size=1024, decode_unicode=True): 
        print(chunk)  
if __name__ == '__main__':  
    test()

1
yield {  
	"event": "message",  
	"retry": 15000,  
	"data": i  

 1
event: message
data: 七
retry: 15000
event: message
data: 夕
retry: 15000
event: message
data: 情
retry: 15000

 1
async def test():  
    headers = {'Content-Type': 'text/event-stream'}  
    sseresp = aiohttp.request("GET", r"http://127.0.0.1:8000/", headers=headers)  
    async with sseresp as r:  
        async for chunk in r.content.iter_any():  
            print(chunk.decode())  
if __name__ == '__main__':  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(test())

 1
from fastapi import FastAPI, Request  
from sse_starlette.sse import EventSourceResponse  
import asyncio  
from pydantic import BaseModel  
app = FastAPI()  
class Message(BaseModel):  
    message: str  
@app.post("/sse")  
async def indexpost(msg: Message, req: Request):  
    async def event_generator(request: Request):  
        res_str = msg.message  
        for i in res_str:  
            if await request.is_disconnected():  
                print("连接已中断")  
                break  
            yield {  
                "event": "message",  
                "retry": 15000,  # milisecond  
                "data": i  
            await asyncio.sleep(0.1)  
    return EventSourceResponse(event_generator(req))

 1
import aiohttp  
import asyncio  
import json  
async def test_post():  
    headers = {'Content-Type': 'application/json'}  
    data = {"message": "七夕情人节快乐!"}  
    sseresp = aiohttp.request("POST", r"http://127.0.0.1:8000/sse", headers=headers, data=json.dumps(data))  
    async with sseresp as r:  
        async for chunk in r.content.iter_any():  
            print(chunk.decode())  
if __name__ == '__main__':  
    loop = asyncio.get_event_loop()  
    loop.run_until_complete(test_post())

1
const eventSource = new EventSource('http_api_url', { withCredentials: true })