添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

前记

Python Asyncio 不仅提供了简单的 Socket 接口,还基于 Asyncio.Socket 提供了 Protocol & Transport 接口以及更高级的 Stream 接口,这些接口大大的减轻了开发者进行网络编程的心理负担。
本文主要介绍了 Asyncio 这些接口的简单使用以及对应的原理分析。

1.简单介绍

Python Asyncio 提供了一套完整的高性能网络编程接口,它包括了兼容位于网络编程最底层的 Socket Asyncio.Socket ,以及在 Asyncio.Socket 上层封装的 Protocol & Transport 接口,还有在 Protocol & Transport 上层封装的 Stream 接口。
这三套接口各有特色,开发者可以根据自己的需求选择其中一套接口来使用,进而减少网络编程的一些心理负担。

Python Asyncio 三套接口的关系就跟套娃一样, Stream 套在 Protocol & Transport 上面 ,而 Protocol & Transport 套在 Socket 上面,由于 Stream 是最上层的封装,所以它的易用性最高,不过适用范围最少,其次是 Protocol & Transport ,最后是 Socket ,它的易用性最差,但是适用范围最广,不过它们的性能却跟套娃顺序无关。
根据 uvloop 的性能比较得出他们的性能关系为 Protocol & Transport > Stream > Socket ,具体结果如图:

在第一次见到这个性能的比较结果时我觉得是非常神奇的,因为对于一些分层架构来说,越上层的封装越多,易用性越好,而性能反而越低,但在性能比较结果中性能最好的却是处于中间的 Protocol & Transport ,然后是 Stream ,最后才是 Asycnio Socket 。为了了解这个原因,需要通过网络编程接口的使用方法和源码一起分析。

1.1.Socket的简单介绍

无论 Asyncio 的网络编程接口是怎么封装,如果要了解它是怎么实现的,那么需要对 Socket 有一定的了解。
不过本文只对 Socket 进行简单的介绍,并不会对 Socket 的原理进行详细的描述,同时 Python Asyncio Stream 接口只支持流传输,所以本文只采用 Socket 进行TCP传输的编程实例进行讲述,其他的编程方式和 Socket 介绍见下文:

  • Python Socket 编程详细介绍
  • Socket Programming in Python (Guide)
  • Socket 是计算机之间进行通信的一种协议,通过 Socket 开发者可以在无需关心底层是如何实现的情况下在不同的计算机进行端到端之间的通信,
    Socket 常见的交互流程如下图:
    1692807503149asyncio网络编程-socket.png

    在交互的流程的示例图中, Socket 分为五个交互阶段,每个阶段的作用如下:

  • 创建 Socket : 初始化一个 Socket 对象。
  • 初始化 Socket : 客户端无需任何操作,而服务端的 Socket 在初始化时比客户端的 Socket 多了两个方法– bind listen ,它们的作用分别是绑定一个端口,以及监听这个端口建立的新连接。
  • 建立连接: 客户端 Socket 的专属方法为 connect ,这个方法会直接与服务端建立连接。而服务端的专属方法为 accept accept 这个方法比较特殊,因为其他 socket 的方法都是针对于 socket 进行操作,而 accept 方法除了针对 socket 进行操作外还会额外返回一个新的 socket
    同时服务端原先的 socket 只携带服务端的IP和地址信息,而新的 socket 携带的是客户端与服务端两个端点的四元组信息(客户端IP,客户端端口,服务端IP,服务端端口)。
    这一点是非常重要的,因为这两个 socket 对应的文件描述符是不一样的,它们的责任也是不一样的,
    原来的 socket 只用于跟客户端建立新的连接,而新的 socket 用于客户端与服务端进行数据交互,这意味着服务端的事件循环在处理的时候,对两个 socket 的读事件的触发时机也是不一样的。其中服务端原先 socket 的读事件被触发时意味着有新的连接可以被 accept ,而新 socket 的读事件被触发则是代表当前连接有新的数据可以被接收,这与客户端的 Socket 读事件的一样的,这意味着在 Socket 建立成功后,客户端和服务端的连接的读写逻辑都可以统一,不用进行区分了。
  • 数据交互阶段:由于服务端 accept 方法返回的 Socket 与客户端的类似,所以这个阶段的客户端与服务端的逻辑是类似的,不过双端程序的数据只是与各自的 Socket 进行交互,而不是直接进行交互的。因为每个 Socket 都维护着读和写两个缓冲区,缓冲区的底层数据结构与队列类似,创建 Socket 的程序只能把数据投递到缓冲区或者从缓冲区获取数据,而无法触碰到网卡发送/接收数据的领域。
    这也意味着在把 Socket 设置为非阻塞的情况下,当 Socket 的写缓冲区不满时, Socket 的写操作是不会阻塞的,同样当 Socket 的读缓冲区拥有的量大于 Socket 读方法需要的量时,读操作也是不会阻塞的。
  • 关闭阶段:由于 Socket 有两个缓冲区,所以关闭阶段分为 close shutdowm 两个方法,其中 close 为关闭两个缓冲区,而 shuwdown 可以关闭指定的缓冲区(详细的流程见后文)。示例中的例子是服务端先调用了 close 方法,然后服务端会发送一个 EOF 事件给客户端,客户端从读缓冲区读到 EOF 事件后发现读通道已经关闭了,才调用 close 方法把整个 socket 一起关闭。
  • 2.Asyncio Socket

    在文章 《初识Python协程的实现》 中介绍了如何把同步请求通过 selector 库和 yield 语法改造成一个简单的基于协程的异步请求,但是改造后的代码增加了很多监听和移除文件描述符的回调代码,编写起来比较麻烦,很不易懂。

    不过在采用了 Asyncio 的思想并引入了 Task Future 后,异步回调的代码都被消除了,但是大量的监听和移除文件描述符的代码还是存在,而 Asyncio.Socket 则封装了大量的读写事件的监听和移除的操作,只暴露了与 Socket 类似的方法,开发者通过这些方法可以简单快速的把同步请求改为基于协程的异步请求,比如 《初识Python协程的实现》 中的同步请求,它的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import socket


    def request(host: str) -> None:
    url: str = f"http://{host}"
    sock: socket.SocketType = socket.socket()
    sock.connect((host, 80))
    sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))

    response_bytes: bytes = b""
    chunk: bytes = sock.recv(4096)
    while chunk:
    response_bytes += chunk
    chunk = sock.recv(4096)
    print("\n".join([i for i in response_bytes.decode().split("\r\n")]))


    if __name__ == "__main__":
    request("so1n.me")

    这份代码只对 Socket 进行简单的调用,其中涉及到 Socket 的调用方法有:

    是否涉及到IO socket.socket 初始化socket socket.connect socket.send socket.recv

    在把它改为 Asyncio.Socket 时,只需要把涉及到IO的 Socket 方法以 loop.sock_xxx(sock, *param) 的形式进行修改,其中 xxx 是原来的方法名, sock 则是通过 socket.socket 实例化的一个 sock 对象,而 param 则保持跟之前的一样的参数,更改后的代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    import asyncio
    import socket


    async def request(host: str) -> None:
    url: str = f"http://{host}"
    loop = asyncio.get_event_loop()
    sock: socket.SocketType = socket.socket()
    await loop.sock_connect(sock, (host, 80))
    await loop.sock_sendall(sock, f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))

    response_bytes: bytes = b""
    chunk: bytes = await loop.sock_recv(sock, 4096)
    while chunk:
    response_bytes += chunk
    chunk = await loop.sock_recv(sock, 4096)
    print("\n".join([i for i in response_bytes.decode().split("\r\n")]))


    if __name__ == "__main__":
    asyncio.run(request("so1n.me"))

    Asyncio Socket 没有提供 send 方法,这里需要改为 sendall

    可以看到,代码的改动并没有很大,除了传染性的 async await 语法外,其它逻辑并没有什么明显的变化,在运行代码之后可以看到程序运行成功,并输出如下响应结果:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    HTTP/1.1 301 Moved Permanently
    Connection: close
    Content-Length: 162
    Server: GitHub.com
    Content-Type: text/html
    Location: https://so1n.me/
    X-GitHub-Request-Id: 9E20:2767:4FED3D:55C800:64E46CAF
    Accept-Ranges: bytes
    Date: Tue, 22 Aug 2023 08:11:04 GMT
    Via: 1.1 varnish
    Age: 233
    X-Served-By: cache-hkg17935-HKG
    X-Cache: HIT
    X-Cache-Hits: 1
    X-Timer: S1692691865.899948,VS0,VE1
    Vary: Accept-Encoding
    X-Fastly-Request-ID: 7180dce567d15eacaf44c9b93a2fb84bd67ab444

    <html>
    <head><title>301 Moved Permanently</title></head>
    <body>
    <center><h1>301 Moved Permanently</h1></center>
    <hr><center>nginx</center>
    </body>
    </html>

    可以看到程序是正常运行的,为了了解 Asyncio.Socket 做了什么工作,接下来会翻阅源码,探究它的处理方法,首先是 loop.sock_connect ,它的源码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # 位于:Lib/asyncio/selector_events.py
    async def sock_connect(self, sock, address):
    # 检查ssl sock以及检查是否为阻塞的sock
    base_events._check_ssl_socket(sock)
    if self._debug and sock.gettimeout() != 0:
    raise ValueError("the socket must be non-blocking")

    if sock.family == socket.AF_INET or (
    base_events._HAS_IPv6 and sock.family == socket.AF_INET6):
    # 通过dns将域名转为ip地址
    resolved = await self._ensure_resolved(
    address, family=sock.family, type=sock.type, proto=sock.proto,
    loop=self,
    )
    _, _, _, _, address = resolved[0]

    # 创建一个future,这个future会等待soc连连接成功才返回数据。
    fut = self.create_future()
    self._sock_connect(fut, sock, address)
    # 通过future等待soc床创建成功
    return await fut

    这个方法分为三部分,首先是检查 Socket 的ssl并进行一些参数校验,然后通过 self._ensure_resolved 方法进行dns解析,最后才通过 self._sock_connect 方法进行真正建立连接。其中,dns解析方法 self._ensure_resolved sock_connect 方法与其他 Socket 方法的不同点,它的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    # 位于:Lib/asyncio/selector_events.py
    async def _ensure_resolved(self, address, *, family=0, type=socket.SOCK_STREAM, proto=0, flags=0, loop):
    host, port = address[:2]
    # 判断是否已经解析,已经解析了就直接使用
    info = _ipaddr_info(host, port, family, type, proto, *address[2:])
    if info is not None:
    # "host" is already a resolved IP.
    return [info]
    else:
    # 没有解析则调用socket.getaddrinfo进行解析
    return await loop.getaddrinfo(host, port, family=family, type=type, proto=proto, flags=flags)

    # 位于:Lib/asyncio/base_events.py
    async def getaddrinfo(self, host, port, *, family=0, type=0, proto=0, flags=0):
    if self._debug:
    getaddr_func = self._getaddrinfo_debug
    else:
    getaddr_func = socket.getaddrinfo

    return await self.run_in_executor(
    None, getaddr_func, host, port, family, type, proto, flags)

    通过源码发现,dns解析的逻辑中涉及到了 run_in_executor 方法,这个方法是把任务交给线程池进行处理。
    在这里使用 run_in_executor 方法的原因是 POSIX 的DNS解析API是阻塞的,且没有提供异步选项,如果直接执行这个方法会卡住整个 Asyncio Event Loop 的运行,所以只能通过线程去调用这个API完成DNS解析,
    不过 Asyncio 的默认线程池数量很小,如果是做爬虫类等需要频繁的进行DNS解析的项目,需要把默认的线程池改大一些。

    uvloop使用的libuv也选择了POSIX API,它的工作原理也是通过线程去执行DNS解析,详情见 why libuv do DNS request by multiple thread

    在通过DNS进行地址解析后就拿到了真正的地址,这时可以开始进行真正的连接了,此时会调用 _sock_connect 方法去建立连接,它的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    # 位于:Lib/asyncio/selector_events.py
    def _sock_connect(self, fut, sock, address):
    fd = sock.fileno()
    try:
    sock.connect(address)
    except (BlockingIOError, InterruptedError):
    # 检查对应的文件描述符对应的连接是否还在运行
    # 如果还在运行则抛错
    self._ensure_fd_no_transport(fd)
    # 注册文件描述符的读回调
    handle = self._add_writer(fd, self._sock_connect_cb, fut, sock, address)
    # 通过fut添加fut完成时移除监听的回调
    fut.add_done_callback(functools.partial(self._sock_write_done, fd, handle=handle))
    except (SystemExit, KeyboardInterrupt):
    raise
    except BaseException as exc:
    fut.set_exception(exc)
    else:
    fut.set_result(None)

    def _sock_write_done(self, fd, fut, handle=None):
    # 移除事件的监听
    if handle is None or not handle.cancelled():
    self.remove_writer(fd)

    def _sock_connect_cb(self, fut, sock, address):
    if fut.done():
    return
    # 当有写事件除非时代表着连接建立这个行为已经执行完毕了,但是需要检查建立结果
    try:
    err = sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
    if err != 0:
    raise OSError(err, f'Connect call failed {address}')
    except (BlockingIOError, InterruptedError):
    pass
    except (SystemExit, KeyboardInterrupt):
    raise
    except BaseException as exc:
    fut.set_exception(exc)
    else:
    fut.set_result(None)

    这个方法的执行逻辑与其他的 Socket 方法的执行逻辑是类似,它们都会先尝试去执行 Socket 原先的方法,这时候如果操作系统准备就绪,那么意味着可以无阻塞的执行了 Socket 方法,否则是操作系统尚未准备好,需要捕获异常并对异常进行处理。

    在捕获到异常后,会通过 _add_writer 添加了一个写事件的回调 _sock_connect_cb ,再通过fut添加一个fut完成时的 _sock_write_done 回调,然后就会把控制权交给了事件循环。
    当事件循环发现文件描述符有事件被触发时,会调用 _sock_connect_cb 获取建立连接的结果,如果结果有异常,则把异常添加到fut中,否则就把结果放置到fut中,这样fut都会从 peding 状态变为 done ,fut也就会触发 _sock_write_done 移除掉事件循环对文件描述符的监听。

    _add_reader _remove_reader _add_writer _remove_writer Asyncio socket selector 直接交互封装的方法,可以通过文章 《Python Asyncio调度原理》 进行了解。

    除了 Socket.connect 方法外,与 Asyncio.Socket 相关的方法还有很多,不过原理是一样的,它们的流程都可以简化为下图:
    asyncio网络编程-asycnio-socket的执行逻辑.png

    该图展示的是每个 Asyncio.Socket 方法的核心逻辑,各个方法具体的执行逻辑可以通过 源码 进行详细的了解,常见的 socket 方法与 Asyncio Socket 方法对照表如下:

    sock方法 Asyncio Sock方法 listen accept sock_accept connect sock_connect connect_ex sock_recv recv_info sock_recv_into recvfrom sock_recvfrom sendto sock_sendto

    3.Protocol&Transport

    Asyncio.Socket 提供了 Socket 的调用方法的封装,但是如果直接基于 Socket 进行网络编程仍然会比较复杂,特别是TCP网络编程还需要处理很多东西。
    为此 Asyncio 提供了一套 Protocol & Transport 接口,它们面向开发者提供的方法都会屏蔽底层的 Socket 细节,并基于TCP或UDP协议封装了一些方法调用,开发者只要根据 Protocol & Transport 协定的几个方法就可以快速开发出一个能够稳定使用的TCP服务。

    Asyncio 的定义中, Protocol & Transport 是无法分开的一个整体,它们一起定义了网络I/O和进程间I/O的抽象接口,对于开发者来说可以简单的把 Protocol 理解为专门负责处理被动调用的,也就是连接什么时候建立,连接什么时候接收了数据;而 Transport 则是提供了许多开发者可以主动调用的接口,包括了向连接发送数据,关闭连接等等。

    3.1使用示例

    官方的 Protocol & Transport 的示例是 TCP Echo ,在这个示例中,服务端会接收客户端的消息并返回给客户端一样的消息,然后再关闭连接。而客户端会发送消息,并在接收消息后等待被关闭,其中服务端的示例代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    import asyncio


    class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
    # transport创建成功后的回调,这里需要跟Protocol进行绑定
    peername = transport.get_extra_info('peername')
    print('Connection from {}'.format(peername))
    self.transport = transport

    def data_received(self, data):
    # socket收到数据的回调事件,这里先接收消息,再把内容返回给客户端最后再关闭
    message = data.decode()
    print('Data received: {!r}'.format(message))

    print('Send: {!r}'.format(message))
    self.transport.write(data)

    print('Close the client socket')
    self.transport.close()


    async def main():
    server = await asyncio.get_running_loop().create_server(
    lambda: EchoServerProtocol(), '127.0.0.1', 8888
    )
    async with server:
    await server.serve_forever()


    asyncio.run(main())

    这份示例代码分为两大部分,一部分是 main 函数,它主要的工作是通过 create_server 创建一个TCP服务并通过 server.serve_forever() 运行服务。另一部分是 EchoServerProtocol ,它主要是在TCP服务 accept 了请求后被创建的,除此之外, EchoServerProtocol 在收到请求数据后会把对应的数据通过 Protocol 的不同方法传递给使用者,而客户端的行为则不一样,它对应的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    import asyncio


    class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, on_con_lost, received_queue):
    # 这里的on_con_lost是一个future对象,用于通知连接已经断开
    self.on_con_lost = on_con_lost
    # 这里的received_queue是一个queue对象,用于接收消息
    self.received_queue = received_queue

    def connection_made(self, transport):
    # transport创建成功后的回调,这里需要跟Protocol进行绑定
    self.transport = transport

    def data_received(self, data):
    self.received_queue.put_nowait(data.decode())

    def connection_lost(self, exc):
    # socket连接断开的回调事件,这里需要通知on_con_lost
    self.on_con_lost.set_result(True)


    async def main():
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()
    received_queue = asyncio.Queue()

    # 初始化连接并接收返回的transport和protocol
    transport, protocol = await loop.create_connection(
    lambda: EchoClientProtocol(on_con_lost, received_queue),
    '127.0.0.1', 8888)

    # 通过transport发送消息,写入消息是直接投递到系统的socket缓冲区,不会阻塞
    transport.write('Hello World!'.encode())
    # 等待消息的返回
    rece_msg = await received_queue.get()
    print("Data received:" + rece_msg)
    try:
    # 等待连接被远方断开
    await on_con_lost
    finally:
    # 确保连接被关闭
    transport.close()


    asyncio.run(main())

    通过源码可以看到客户端也分为两部分,首先是 EchoClientProtocol ,它与服务端的 Protocol 类似,主要的区别是在初始化时的多了 on_con_lost received_queue 参数,通过它们可以把异步回调转为同步调用。其中 on_con_lost 是用于监听连接什么时候丢失, received_queue 则是接收了来自 Socket 的数据以供调用者获取。
    另一部分的 main 函数的不同点是先通过 create_connection 建立了连接并返回了 Protocol & Transport ,在建立连接成功后使用者可以通过 Transport 发送数据,或者通过 received_queue 获取数据以及通过 on_con_lost 等待连接被断开。

    在把客户端与服务端的代码编写完毕后,先运行服务端的代码,然后再运行客户端的代码,会发现服务端打印了如下数据:

    1
    2
    3
    4
    Connection from ('127.0.0.1', 49072)
    Data received: 'Hello World!'
    Send: 'Hello World!'
    Close the client socket

    而客户端打印了如下数据:

    1
    Data received:Hello World!

    通过输出结果可以看到示例代码是正常运行的,它们基于 Protocol & Transport 完成了数据的传输,
    同时在运行的过程中涵盖了 Protocol & Transport 的主要方法,对应的执行过程如下图:
    1692808529149asyncio网络编程-ProtocolTransport.png

    图中 loop 代表事件循环的方法, t 代表 Transport 的方法, p 则代表 Protoccol 的方法,通过图的执行过程可以看到除了 loop 的方法是在进行初始化外,不管对于客户端还是服务端, Protocol 负责做回调的事情, Transport 则是做主动调用的事情。此外客户端和服务端的 Protocol & Transport 在建立连接和数据交互阶段的作用是一致的,可以看出 Protocol & Transport 的逻辑是客户端和服务端共享的。

    不过再回过头看示例代码则可以发现示例代码基于它们进行拓展开发的方式有所不同,对于服务端,由于它得等到客户端调用才能开始处理请求,它属于被动的一方,所以对服务端进行拓展开发时需要在它们的回调事件中编写对应的业务代码,并对 Transport 进行调用。
    而客户端则处于主动的一方,需要初始化一些容器把异步回调变为同步调用,比如 asyncio.Future asyncio.Queue 等交给 Protocol 接收数据,然后与 loop.create_connection 返回的 transport 一起进行主动调用。

    3.2.源码分析

    在了解了 Protocol & Transport 的使用方法后,可以发现在 Protocol & Transport 中已经看不到 Socket 的影子了,需要通过对 Protocol & Transport 的源码进行分析,才能了解它性能更强的秘密。

    在示例代码中可以看到客户端与服务端使用的 Protocol & Transport 是一致的,所以先从它们各自的初始化方法 create_connection create_server 开始下手。
    其中 create_connection 方法非常简单,它先是对 Socket 和其他参数进行校验,接着再进行DNS解析以及通过 Happy Eyeballs 快速的选择IP地址,然后再调用 _create_connection_transport 方法。

    _create_connection_transport 方法只是创建并返回 Transport 实例和 Protocol 实例,其中 Protocol 实例是通过用户传递的 Protocol 构造函数创建的, Transport 则是由对应的事件循环创建的。

  • 快乐眼球算法见: Happy Eyeballs
  • create_connection 的源码比较简单,具体见: https://github.com/python/cpython/blob/677320348728ce058fa3579017e985af74a236d4/Lib/asyncio/base_events.py#L976
  • 对于服务端的 create_server 方法,它的处理逻辑一开始也是跟 create_connection 方法类似,也是先对 socket 和其他参数进行校验,然后再把参数和 Socket 放到 Server 实例中,接着再调用 Server 实例的 serve_forever 方法启动服务,而 serve_forever 的主要方法会调用到 _start_serving 方法(asyncio/base_events.py),如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 位于:Lib/asyncio/selector_events.py
    def _start_serving(self):
    if self._serving:
    return
    self._serving = True
    for sock in self._sockets:
    sock.listen(self._backlog) # <--重点,后面有说
    self._loop._start_serving(
    self._protocol_factory, sock, self._ssl_context,
    self, self._backlog, self._ssl_handshake_timeout,
    self._ssl_shutdown_timeout)

    该方法会对所有托管的 socket 进行 listen 操作,并调用事件循环的 _start_serving 方法(asyncio/selector_events.py)为 Socket 向事件循环注册对应的可读事件回调,源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    # 位于:Lib/asyncio/selector_events.py
    def _start_serving(self, protocol_factory, sock,
    sslcontext=None, server=None, backlog=100,
    ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
    ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
    self._add_reader(sock.fileno(), self._accept_connection,
    protocol_factory, sock, sslcontext, server, backlog,
    ssl_handshake_timeout, ssl_shutdown_timeout)

    通过源码可以看到这个方法是添加 Socket 文件描述符的可读事件回调,在添加之后每当 Socket 与客户端建立连接时,事件循环就会发现并调用 _accept_connection 方法, _accept_connection 方法的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    # 位于:Lib/asyncio/selector_events.py
    def _accept_connection(
    self, protocol_factory, sock,
    sslcontext=None, server=None, backlog=100,
    ssl_handshake_timeout=constants.SSL_HANDSHAKE_TIMEOUT,
    ssl_shutdown_timeout=constants.SSL_SHUTDOWN_TIMEOUT):
    # 执行n次accept,直到没有新的请求建立或者执行完毕
    for _ in range(backlog):
    try:
    conn, addr = sock.accept()
    conn.setblocking(False)
    except (BlockingIOError, InterruptedError, ConnectionAbortedError):
    # socket accept的缓冲区为空,所以不用处理了
    return None
    except OSError as exc:
    # 如果有错误,则应该移除监听,再重新调用`_start_serving`进行监听
    if exc.errno in (errno.EMFILE, errno.ENFILE,
    errno.ENOBUFS, errno.ENOMEM):
    # 兼容accept多次后Linux一直将文件描述符标记为准备就绪的bug,先移除监听再重新监听
    self.call_exception_handler({
    'message': 'socket.accept() out of system resource',
    'exception': exc,
    'socket': trsock.TransportSocket(sock),
    })
    self._remove_reader(sock.fileno())
    self.call_later(constants.ACCEPT_RETRY_DELAY,
    self._start_serving,
    protocol_factory, sock, sslcontext, server,
    backlog, ssl_handshake_timeout,
    ssl_shutdown_timeout)
    else:
    raise # The event loop will catch, log and ignore it.
    else:
    extra = {'peername': addr}
    # 每个acccept都可以认为是一个新的客户端建立的请求,为了提高并发性,需要创建一个新的协程去处理。
    accept = self._accept_connection2(
    protocol_factory, conn, extra, sslcontext, server,
    ssl_handshake_timeout, ssl_shutdown_timeout)
    self.create_task(accept)

    通过源码可以发现 _accept_connection 方法的主要作用就是同时处理backlog个 socket accept 方法。
    这个处理是专门针对事件循环进行优化的,因为 Socket 在接收到新的请求后会马上通知给事件循环,然后等待事件循环去调用事件对应的回调,虽然 epoll 的处理速度很快了,但是 Socket 收到新的请求与回调的执行仍有一定的延迟。
    如果是触发一次事件就执行一次 accept ,那么处理整个程序 accept 的效率会降低,但是同时处理 Socket 的多个 accept 则可能会使系统的瞬间负载提高,所以这个方法会提供一个 backlog 参数供开发者选择 backlog 的大小以决定每次收到事件后执行多少次 accept 方法。

    那么 backlog 的大小要怎么定义呢,其实这里的 backlog _start_serving 方法(asyncio/base_events.py)中 listen 用到的 backlog 是一样的,而对于 listen 的backlog大小是需要根据场景来进行选择的,在Linux中,默认的backlog为128,而常见的后端服务的应用 Nginx Redis 的默认值为511,这里不对 backlog 进行详细介绍,有兴趣的可以通过以下文章了解:

  • 再聊 TCP backlog
  • What is “backlog” in TCP connections?
  • accept 成功后,则会调用 _accept_connection2 方法, _accept_connection2 与客户端的 _create_connection_transport 一样,它创建了 Transport 实例和 Protocol 实例。不过在服务端中为了提高性能,通常都是一个协程对应一个 Protocol & Transport ,所以是通过 create_task 来执行 _accept_connection2 方法。

    3.2.1.Transport

    create_server create_connection 只是负责对参数的校验以及创建和监听 Socket ,真正负责数据交互的逻辑都藏在 Protocol & Transport 之中。
    Protocol & Transport 的协定中, Transport 可以理解为 Socket 的上层,它负责控制 Socket 的所有行为,包括数据的读,写,限制流的传输还有最重要的是对 Protocol 的流进行控制。比如在对 Tranposrt 进行初始化时,它会在 Socket 创建完毕后调用 Protocol connection_made 方法, Transport 初始化的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    # 位于:Lib/asyncio/selector_events.py
    # Transport有多个实现,而_SelectorSocketTransport是专门适配UnixSelector事件循环的Transport的实现,它有一定的代表意义。
    class _SelectorSocketTransport(_SelectorTransport):
    def __init__(self, loop, sock, protocol, waiter=None,
    extra=None, server=None):
    self._read_ready_cb = None
    # 初始化参数以及通过`set_protocol`设置Protocol
    super().__init__(loop, sock, protocol, extra, server)
    self._eof = False
    self._paused = False
    self._empty_waiter = None

    # 设置TCP_NODELAY,禁用Nagle算法以提升提升性能
    base_events._set_nodelay(self._sock)
    # transport创建完毕
    self._loop.call_soon(self._protocol.connection_made, self)
    # 注册读事件的回调
    self._loop.call_soon(self._add_reader, self._sock_fd, self._read_ready)
    if waiter is not None:
    self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)

    def set_protocol(self, protocol):
    # 根据协议设置对应的已读回调
    if isinstance(protocol, protocols.BufferedProtocol):
    self._read_ready_cb = self._read_ready__get_buffer
    else:
    self._read_ready_cb = self._read_ready__data_received

    super().set_protocol(protocol)

    源码中, Transport 在初始化时会先初始化参数并设置已读事件的回调,然后再对 sock 进行处理,在这里只设置TCP_NODELAY为True以禁用Nagle算法。

    Ngale算法是为了优化网络传输而诞生的,但是优化网络传输跟时代是有关系的,在以前网络带宽都比较小的互联网初期,如果都是传输小流量的请求体会比较容易引起网络堵塞。
    比如要传输10个字节长度为1的请求体,它们都会被装载在TCP报文上面,而TCP本身Header的长度在40字节左右,那么此时网络要传输的字节总数为10*(40 + 1) = 410字节。而在应用Ngale算法后,它可以把10个请求体合并在一起,那么传输的字节总数为40 + 1 * 10 = 50字节,可见在使用Ngale算法后可以减少大量的网络传输。

    然而到了现在,大部分设备的网络带宽已经变得很大,而且Nagle算法与delay-ACK搭配会带来网络延迟,这对于类似HTTP1.1的请求来说影响可能不大,但对于那些小频快跑的实时数据交互场景却容易受到Nagle影响进而影响网络传输性能,为此 Transport 在初始化的时候就默认禁用了Nagle算法以减少网络数据交互的延迟。

    这里只做简单易懂的举例和介绍,实际上是比较复杂的,可以通过下面的连接进一步的了解。

  • Nagle算法(英)
  • 关于讨论Asyncio是否默认禁用Nagle算法的Issue(英)
  • Socket 创建完毕后, Transport 会调用 Protocol connection_made 表示 transport 创建完毕,然后再调用 _add_reader 方法向 socket 文件描述符注册了可读事件的回调函数 self._read_ready

    __init__ 方法无法被标记为 async 函数,所以这里使用了一个 waiter 用于标识 __init__ 方法何时执行完毕,使其达到类似 async 函数的实现。

    Transport 创建完毕后,每当 socket 收到一条消息,就会触发一个可读事件,然后事件循环就会执行 self._read_ready 去处理消息。
    这里假设 self._read_ready_cb self._read_ready__data_received ,它对应的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    # 位于:Lib/asyncio/selector_events.py
    class _SelectorSocketTransport(_SelectorTransport):
    ...
    def _read_ready__data_received(self):
    if self._conn_lost:
    return
    try:
    # 通过sock获获取数据
    data = self._sock.recv(self.max_size)
    # 省去一些异常处理
    except BaseException as exc:
    self._fatal_error(exc, 'Fatal read error on socket transport')
    return

    if not data:
    # 没有数据则代表收到eof
    self._read_ready__on_eof()
    return

    try:
    self._protocol.data_received(data)
    # 省去一些异常处理
    except BaseException as exc:
    self._fatal_error(
    exc, 'Fatal error: protocol.data_received() call failed.')

    def _read_ready__on_eof(self):
    try:
    keep_open = self._protocol.eof_received()
    # 省去一些异常处理
    except BaseException as exc:
    self._fatal_error(
    exc, 'Fatal error: protocol.eof_received() call failed.')
    return

    if keep_open:
    self._loop._remove_reader(self._sock_fd)
    else:
    self.close()

    在去掉其中的异常处理后可以发现,实际上它的工作原理就是通过 sock.recv 接收数据,当收到的数据不为空时就调用 Protocol data_received 把数据传递给开发者定义的方法中,为空时就调用 self._read_ready__on_eof
    _read_ready__on_eof 的逻辑也是很简单的,它会调用 Protocol eof_received 方法获取返回结果,这个结果是开发者定义的,开发者可以定义它返回的是 True 还是 False ,如果是返回 True 则只移除读监听事件,这样方便在关闭连接之前 Socket 还能继续发送消息,如果返回 False 则是直接关闭连接 Socket ,使 Socket 既不能读也不能写。

    此外,还可以从源码看到在执行读事件的回调时如果有异常发生就会调用 _fatal_error 方法进行处理,这个方法除了报告异常外,还会关闭 Socket
    在之前的介绍中 Socket 是拥有读写两个缓冲区,也介绍了 Socket 支持只关闭一个缓冲区,另外一个缓冲区还能继续工作的情况,而在 Transport 中,只关闭写缓冲区称为普通关闭,两个缓冲区都关闭称为强制关闭,它们对应的方法为 close abort ,其中 abort 方法与 _fatal_error 的功能是类似的,它们对应的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    # 位于:Lib/asyncio/selector_events.py
    class _SelectorTransport(transports._FlowControlMixin, transports.Transport):
    ...
    def abort(self):
    # 强制关闭连接
    self._force_close(None)

    def is_closing(self):
    return self._closing

    def close(self):
    if self._closing:
    return
    # close只移除读事件监听,buffer没有数据才会强制关闭
    self._closing = True
    self._loop._remove_reader(self._sock_fd)
    if not self._buffer:
    self._conn_lost += 1
    self._loop._remove_writer(self._sock_fd)
    self._loop.call_soon(self._call_connection_lost, None)

    def __del__(self, _warn=warnings.warn):
    if self._sock is not None:
    _warn(f"unclosed transport {self!r}", ResourceWarning, source=self)
    # 确保sock被回收,如果没有这个操作,可能会导致内存溢出
    self._sock.close()

    def _fatal_error(self, exc, message='Fatal error on transport'):
    # 省略的异常处理...
    self._force_close(exc)

    def _force_close(self, exc):
    if self._conn_lost:
    # 如果连接丢失就不管了
    return
    if self._buffer:
    # 移除写事件监听以及缓存的buffer
    self._buffer.clear()
    self._loop._remove_writer(self._sock_fd)
    if not self._closing:
    # 移除读事件监听
    self._closing = True
    self._loop._remove_reader(self._sock_fd)
    # 稍后才关闭连接
    self._loop.call_soon(self._call_connection_lost, exc)

    def _call_connection_lost(self, exc):
    try:
    if self._protocol_connected:
    # 如果当前还处于连接中,那么需要通过Protocol的`connection_lost`方法把连接丢失的原因告诉使用方。
    self._protocol.connection_lost(exc)
    finally:
    # 关闭sock等一切资源
    self._sock.close()
    self._sock = None
    self._protocol = None
    self._loop = None
    server = self._server
    if server is not None:
    server._detach()
    self._server = None

    通过源码可以发现 abort _fatal_error 方法的唯一区别是 _fatal_error 方法会携带异常参数, abort 的异常参数为空,而它们的执行逻辑都是调用 _force_close 方法对 Socket 进行强制关闭。
    _force_close 方法被调用后,它会移除对应的事件监听,并把关闭连接的方法 _call_connection_lost 安排到 Asyncio Event Loop 中,交给 Asyncio Event Loop 调用。
    这样做的目的是考虑到了还有一些读事件和写事件正在等待被 Asyncio Event Loop 执行,这时如果强制关闭 Socket 会导致这些事件被调用时由于 Socket 已经关闭而无法发送或获取数据,所以需要把 _call_connection_lost 的调用安排在读/写事件之后被运行。(asyncio的调度是按照先进先出为原则)

    通过源码也可以发现 close 方法相对 _force_close 方法的唯一的区别是在buffer缓冲不为空时不会移除写事件监听也不会调用 _call_connection_lost 方法,从而确保所有在buffer缓冲区的消息都能正常发送。另外 Transport 还有一个方法– __del__ ,它是确保 Transport 被回收时, Socket 会被完全关闭,不然可能造成内存溢出。

    关于 Python Socket 的内存溢出困扰了多个开源项目多年后才被解决,具体可以通过文章 《Fixing Memory Leaks In Popular Python Libraries》 了解。

    最后,只剩下一个写数据的方法尚未窥探,它的相关源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    # 位于:Lib/asyncio/selector_events.py
    class _SelectorSocketTransport(_SelectorTransport):
    ...

    def write(self, data):
    ...
    # 省略一些校验的
    if self._eof:
    # 已经eof了,不能写数据
    raise RuntimeError('Cannot call write() after write_eof()')
    if not self._buffer:
    # 如果没有buffer,就代表当前的发送缓冲区可能为空,先发发试一试
    try:
    # 先看看能扔多少数据去缓冲区
    n = self._sock.send(data)
    except (BlockingIOError, InterruptedError):
    # 缓冲区满了,无法发送...
    pass
    # 省去一些异常处理
    except BaseException as exc:
    self._fatal_error(exc, 'Fatal write error on socket transport')
    return
    else:
    # 去掉已经扔到缓冲区的数据
    data = data[n:]
    if not data:
    return
    # 如果缓冲区满了,那么还有剩余的数据,需要等待socket可写时才调用回调写数据
    self._loop._add_writer(self._sock_fd, self._write_ready)

    # 把数据加入buffer中,buffer默认是bytesarray,效率很高
    self._buffer.extend(data)
    # 如果写的压力很高,就需要暂停一下
    self._maybe_pause_protocol()

    def _write_ready(self):

    if self._conn_lost:
    return
    try:
    n = self._sock.send(self._buffer)
    # 省去一些异常处理
    except BaseException as exc:
    # 如果有异常,就移除监听,清空buffer
    self._loop._remove_writer(self._sock_fd)
    self._buffer.clear()
    # 抛出异常并通过`force`强制关闭socket
    self._fatal_error(exc, 'Fatal write error on socket transport')
    else:
    if n:
    # 从buffer清除掉已经发送的buffer
    del self._buffer[:n]
    self._maybe_resume_protocol() # May append to buffer.
    if not self._buffer:
    # 发送完了需要进行善后处理,因为有可能socket在发送完消息后就要关闭
    self._loop._remove_writer(self._sock_fd)
    if self._closing:
    self._call_connection_lost(None)
    elif self._eof:
    self._sock.shutdown(socket.SHUT_WR)

    这里比较特别的是写数据的方法 write 是一个普通的函数,这是因为 socket 底层有缓冲区,所以写入数据是非常方便的,且在设置不阻塞后,只要调用 socket.send 就可以把数据投递到缓冲区并马上返回,这个方法不涉及任何IO。

    然而缓冲区也有满的情况,于是 Transport 对缓冲区满的情况做一些处理,正常情况下缓冲区的满有两种情况,如下图:
    asyncio网络编程-socket写缓冲区满的情况.png

    图中假设缓冲区的大小为5,而投递数据的大小为3,对于图的左边,在投递数据的时候会发现缓冲区已经满,这时候操作系统会返回一个错误;而右边是投递时还没满的情况,这种情况下只能投递前面两个消息,此时 Transport 在执行 socket.send 方法后会获得到返回值为2,接着就删除 buffer 中前面的两个消息,只留下一个消息等待缓冲区可投递时再进行投递。

    在后续如果缓冲区还不可投递时且仍有数据通过 write 方法被发送过来, Transport 会把数据添加到 buffer 中,再监听可写事件,当 socket 可写时,才会调用 _write_ready 方法把 buffer 中的数据发送,这一个过程会随着可读事件的监听移除才结束,而只有 buffer 为空或者发送异常时,才会移除可读事件的监听。

    write 源码中还可以看到 _maybe_resume_protocol _maybe_pause_protocol 方法的相关调用,这两个方法都是为了控制写入速度的,毕竟 buffer 的长度是无限的,如果一些恶意客户端与服务端建立请求后,客户端选择拒收消息从而导致 buffer 会堆积一堆数据,而这些数据也是无意义同时在积累过多后可能导致服务端崩溃,所以需要根据 buffer 的积累数据的量决定暂停写入还是恢复写入。

    除此之外,在 _ready_write 方法中还涉及到了 eof 机制, eof end of file 的缩写,它是表示流的结尾的标志。由于TCP是双工的协议,如果其中一端想关闭连接时,另一端可能正在发送数据,虽然程序不需要再写数据了,但不能直接关闭 Socket ,需要获取对方发送过来的所有数据后才能关闭。
    socket eof 机制就是用于告诉对位的读端在收到这个标记后就不需要再接收数据,且后续的数据发送完后也请尽快的关闭。
    Transport 中通过 write_eof 方法提供了一个主动标记写通道为 eof 的功能,使用者也通过 can_write_eof 判断当前 Transport 是否可以使用 eof 机制,它们对应的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 位于:Lib/asyncio/selector_events.py
    class _SelectorSocketTransport(_SelectorTransport):
    ...
    def write_eof(self):
    if self._closing or self._eof:
    return
    self._eof = True
    if not self._buffer:
    self._sock.shutdown(socket.SHUT_WR)

    def can_write_eof(self):
    return True

    可以看到 Transport 中与 eof 相关的源码很简单,它主要是先标记 _eof 为True,然后再判断当前的 buffer 是否为空,是的话就通过 sock.shutdown(socket.SHUT_WR) 关闭 Socket 中的写缓冲区。如果 buffer 不为空,则不做任何处理,在 _write_ready 发送完 buffer 的所有数据后再调用 sock.shutdown(socket.SHUT_WR) 关闭。

  • 当服务端调用 sock.shutdown(socket.SHUT_WR) 后,客户端会通过 socket.recv 收到一条空消息,客户端会通过空消息判定是服务端到发送端已经 eof 了。
  • shutdown close 的区别: socket 对应的是操作系统的一个资源,多个进程可以拥有同一个 socket 的句柄,当调用 close 时,会把句柄的计数减为1,当句柄技计数为0的时候, socket 才会真正的关闭并释放资源。而 shutdown 则是会关闭底层的连接,比如它可以关闭读端,写端或者同时关闭读写端,并等待对方发送FIN/EOF,但是它不会释放 socket 占用的资源,调用者仍然需要调用 shutdown
  • 4.Stream

    Stream Asyncio 中的高级API,通过 Stream 可以方便的为流式服务进行编程,同时通过 Stream 编写出来的代码既简洁又容易理解,如官方文档的 TCP Echo client 示例代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    import asyncio

    async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
    '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

    asyncio.run(tcp_echo_client('Hello World!'))

    可以看到,这份示例代码十分简单,它在初始化时会返回 reader writer 对象,后续调用者可以通过 reader 读取消息,并通过 writer 发送消息和关闭连接。
    而基于 Stream 接口编写的服务端的也变得简单了, TCP Echo Server 源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    import asyncio

    async def handle_echo(reader, writer):
    addr = writer.get_extra_info('peername')

    data = await reader.read(100)
    message = data.decode()
    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

    async def main():
    server = await asyncio.start_server(
    handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
    await server.serve_forever()

    asyncio.run(main())

    通过代码可以发现,基于 Stream 编写的TCP Echo服务中 main 函数的用法与 Protocol & Transport 类似创建一个 Server 实例,而处理连接的 handle_echo 方法的代码量比 Protocol & Transport 还要少,同时它不再像 Protocol & Transport 一样包含大量的异步回调方法。

    下图是 Stream 客户端与 Stream 服务端的 TCP Echo 示例:
    1693300383481asyncio网络编程-Stream.png

    通过图可以发现它们交互的形式与 Protocol & Transport 类似,实际上 Stream 的核心 StreamProtocol 就是基于 Protocol & Transport 进行拓展,然后 Stream 还抽象出 StreamReader StreamWrite 两个对象用于连接用户和 StreamProtocol 的数据交互,使用户可以使用同步的思想进行网络编程。

    4.1.源码分析

    Stream 最大的特点就是把回调事件转为同步给用户使用以及运用了一些限流手段,但是它把细节全都隐藏起来了,需要通过源码去窥探它的运行逻辑。

    首先是客户端和服务端建立连接和初始化的方法,它们的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # 位于:Lib/asyncio/stream.py
    async def open_connection(host=None, port=None, *, limit=_DEFAULT_LIMIT, **kwds):
    # 客户端建立连接的方法
    loop = events.get_running_loop()
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, loop=loop)
    transport, _ = await loop.create_connection(
    lambda: protocol, host, port, **kwds)
    writer = StreamWriter(transport, protocol, reader, loop)
    return reader, writer


    async def start_server(client_connected_cb, host=None, port=None, *, limit=_DEFAULT_LIMIT, **kwds):
    # 服务端建立连接的方法
    loop = events.get_running_loop()

    def factory():
    reader = StreamReader(limit=limit, loop=loop)
    protocol = StreamReaderProtocol(reader, client_connected_cb, loop=loop)
    return protocol

    return await loop.create_server(factory, host, port, **kwds)

    通过源码可以知道, open_connection start_server 类似,它们都是先创建好 StreamReaderProtocol ,再交给 create_connection 或者 create_server 运行。
    StreamReaderProtocol 是继承于 Protocol ,它相比于 Protocol 多了一些适配了 StreamReader StreamWriter 的调用。

    StreamProtocol 只是一个中间者,很多逻辑与 Protocol & Transport 类似,故不会进行分析。

    4.1.1.StreamWriter

    对于 StreamWriter 可以认为是 Transport 的代理对象,所以它负责的也是主动调用的那部分,它的很多方法都是直接调用到 Transport 的同名方法,它与 Transport 的唯一区别就是多了 wait_closed drain 方法两个方法,对应的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    # 位于:Lib/asyncio/stream.py
    class StreamWriter:
    # 初始化...
    def __init__(self, transport, protocol, reader, loop):
    self._transport = transport
    self._protocol = protocol
    self._reader = reader
    self._loop = loop

    # 省略了一堆代理transport的方法
    ...

    # 相比transport多出来的方法
    async def wait_closed(self):
    await self._protocol._get_close_waiter(self)

    async def drain(self):
    if self._reader is not None:
    exc = self._reader.exception()
    if exc is not None:
    raise exc
    if self._transport.is_closing():
    await sleep(0)
    await self._protocol._drain_helper()

    源码中的 wait_closed 是等待 Protocol _get_close_waiter 的方法,这个方法会一直阻塞直到 Protocol connection_lost 被调用,所以在调用 await transport.wait_closed() 实际上等于 Protocol & Transport 示例代码中的 on_con_lost
    drain 方法则是 Stream 的流控制功能,它是结合 Write 一起使用的,它能防止写缓冲区被写满,以及及早的发现写缓冲区的异常,所以在使用 Stream 进行网络编程时,在执行 write.write() 后,一定要调用 await write.drain() 方法。

    此外,源码中的 drain 方法有一句奇特的代码– await sleep(0) ,这段代码出现在这里的原因是在 Protocol & Transport 中, write 的调用出错后会让事件循环尽快的执行 _connection_lost 的调用,但是用户在调用

    1
    2
    write.write()
    await write.drain()

    的过程中并没有让步在事件循环,如果不添加 await sleep(0) 显式的让步给事件循环,就会导致 Protocol _drain_helper 方法会在 _connection_lost 之前调用,最终导致写入数据这个操作无法感知到连接已经丢失。

    drain_helper 方法是与流控制相结合的,它在 Protocol & Transport 调用了 _maybe_pause_protocol 时会阻塞,直到 _maybe_resume_protocol 被调用的时候才会释放,流控制的相关代码可以通过 FlowControlMixin的源码 了解。

    4.1.1.StreamReader

    用户在调用 StreamReader 时,通常都会调用到 read* 系列方法来获取数据,但是在 Protocol & Transport 中的数据是通过回调把数据传递给用户的。 Stream 为了解决这个问题,它把 StreamReader 设计成一个类似于先进先出的容器,当 Protocol & Transport 有数据时会把数据通过投喂数据的方法写入到 SteamReader 中,并提供获取数据,中止投递数据等方法交给用户调用。

    其中投喂数据的源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    class StreamReader:

    def _wakeup_waiter(self):
    # 告诉read系列的方法,已经有数据进来了,可以继续执行了
    waiter = self._waiter
    if waiter is not None:
    self._waiter = None
    if not waiter.cancelled():
    waiter.set_result(None)

    def feed_eof(self):
    # 投递了一个eof的信息
    self._eof = True
    self._wakeup_waiter()

    def at_eof(self):
    """给StreamProtocol判断是否需要投递`eof`"""
    return self._eof and not self._buffer

    def feed_data(self, data):
    assert not self._eof, 'feed_data after feed_eof'

    if not data:
    return

    # 把数据添加到buffer中
    self._buffer.extend(data)
    # 通过`wakeup_waiter`唤醒read系列方法
    self._wakeup_waiter()

    if (self._transport is not None and not self._paused and len(self._buffer) > 2 * self._limit):
    # 如果当前的buffer数据超过了限制,就暂停数据的读取
    try:
    self._transport.pause_reading()
    except NotImplementedError:
    # 如果当前transport不支持暂停,就直接忘掉它,后面需要调用者通过`set_transport`重新设置transport
    self._transport = None
    else:
    self._paused = True

    通过源码可以看出, StreamReader 中的投喂数据方法和用户调用方法会共享 self._waiter self._buffer 对象,其中 self._buffer 用于接收和读取数据,而 self._waiter 用于通知 read 系列等方法,告诉它们有数据来了,可以继续处理, StreamReader 的读数据相关源码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    class StreamReader:
    ... # 省略设置异常的方法

    def _maybe_resume_transport(self):
    # 由于读取了数据了,`buffer`当前的水位已经降低,需要尝试恢复从transport读取数据。
    if self._paused and len(self._buffer) <= self._limit:
    self._paused = False
    self._transport.resume_reading()

    async def _wait_for_data(self, func_name):
    if self._waiter is not None:
    raise RuntimeError(f'{func_name}() called while another coroutine is already waiting for incoming data')

    # 如果已经被暂停,那么应该恢复数据的读取
    if self._paused:
    self._paused = False
    self._transport.resume_reading()

    # 创建_waiter并等待被唤醒。
    self._waiter = self._loop.create_future()
    try:
    await self._waiter
    finally:
    self._waiter = None

    async def read(self, n=-1):
    # 有异常直接抛异常
    if self._exception is not None:
    raise self._exception

    if n == 0:
    # 如果为0,那么直接返回
    return b''

    # 省去小于0的情况,小于0时会一直获取数据,直到接收到EOF
    ...

    # 如果没有buffer,则需要等待
    if not self._buffer and not self._eof:
    await self._wait_for_data('read')

    # 从buffer获取数据(同时也清空了它们在buffer中的占用)
    data = bytes(self._buffer[:n])
    del self._buffer[:n]

    self._maybe_resume_transport()
    return data

    通过源码可以看到 StreamReader 获取数据的方法比 Protocol & Transport 复杂了一点,它在调用 read 时,如果发现 buffer 有数据,就直接返回数据,否则就需要通过 wait_for_data 方法等待 waiter 对象被投喂数据的方法设置为不阻塞。

    在这个流程中, Stream 通过 waiter 对象和 buffer 完成异步回调到同步调用的转换,但是这样会导致每当有一条消息进来的时候, StreamReader 需要通过 Asyncio Event Loop 的两次调用才能获取到消息,这也正是 Stream Protocol & Protocol 性能差点原因。

    5.总结

    通过 Protocol & Transport 中的源码可以看到, Protocol & Transport 通过一次事件批量 accept 以及使用了 buffer 加快了发送速度来获得了比 Asyncio Socket 高出很多的性能,而 Stream 通过 asyncio.Future asyncio.Queue Protocol & Transport 的异步回调转换为同步调用,以一定的性能消耗换取了易用性。

    在TCP编程的场景中,这两个网络编程接口都有它们对应的使用场景,我们可以通过使用场景来选择对应的接口进行网络编程开发,通常情况下,默认服务端都会使用 Protocol & Transport 进行网络编程开发,因为它们都会追求极高的性能。
    而客户端则默认会使用 Stream 进行网络编程开发,因为客户端会偏通过同步调用的方式进行开发,如果使用 Protocol & Transport 进行开发,也需要用到 asyncio.Future asycnio.Queue 容器把异步回调转换为同步调用,这样一来使用 Protocol & Transport 开发和使用 Stream 进行开发的客户端性能是差不多的。

    本文是偏理论的分析 Python Asyncio 的网络编程相关的接口原理,在后续将介绍如何通过 Protocol & Transport 开发一个Web框架。