添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

源码: Lib/asyncio/streams.py

流是用于处理网络连接的支持 async/await 的高层级原语。 流允许发送和接收数据,而不需要使用回调或低级协议和传输。

下面是一个使用 asyncio streams 编写的 TCP echo 客户端示例:

import asyncio
async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)
    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()
    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')
    print('Close the connection')
    writer.close()
    await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))

参见下面的 Examples 部分。

Stream 函数

下面的高级 asyncio 函数可以用来创建和处理流:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

建立网络连接并返回一对 (reader, writer) 对象。

返回的 readerwriter 对象是 StreamReaderStreamWriter 类的实例。

limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。

其余的参数直接传递到 loop.create_connection()

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

在 3.7 版更改: Added the ssl_handshake_timeout parameter.

3.8 新版功能: Added happy_eyeballs_delay and interleave parameters.

在 3.10 版更改: Removed the loop parameter.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

启动套接字服务。

当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。该函数会接收到一对参数 (reader, writer) ,reader是类 StreamReader 的实例,而writer是类 StreamWriter 的实例。

client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task 被调度。

limit 确定返回的 StreamReader 实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。

余下的参数将会直接传递给 loop.create_server().

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

在 3.7 版更改: Added the ssl_handshake_timeout and start_serving parameters.

在 3.10 版更改: Removed the loop parameter.

Unix 套接字

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

建立一个 Unix 套接字连接并返回 (reader, writer) 这对返回值。

open_connection() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_connection().

The sock argument transfers ownership of the socket to the StreamWriter created. To close the socket, call its close() method.

可用性: Unix。

在 3.7 版更改: Added the ssl_handshake_timeout parameter. The path parameter can now be a path-like object

在 3.10 版更改: Removed the loop parameter.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

启动一个 Unix 套接字服务。

start_server() 相似,但是是在 Unix 套接字上的操作。

请看文档 loop.create_unix_server() .

The sock argument transfers ownership of the socket to the server created. To close the socket, call the server’s close() method.

可用性: Unix。

在 3.7 版更改: Added the ssl_handshake_timeout and start_serving parameters. The path parameter can now be a path-like object.

在 3.10 版更改: Removed the loop parameter.

class asyncio.StreamReader

这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。

不推荐直接实例化 StreamReader 对象,建议使用 open_connection()start_server() 来获取 StreamReader 实例。

coroutine read(n=- 1)

至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1 , -1时表示读至 EOF 并返回所有读取的byte。

如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes 对象。

coroutine readuntil(separator=b'\n')

从流中读取数据直至遇到 separator

成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。

如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError 异常,数据将留在内部缓冲区中并可以再次读取。

如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError 异常,并重置内部缓冲区。 IncompleteReadError.partial 属性可能包含指定separator的一部分。

3.5.2 新版功能.

class asyncio.StreamWriter

这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。

不建议直接实例化 StreamWriter;而应改用 open_connection()start_server()

write(data)

此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

stream.write(data)
await stream.drain()
writelines(data)

此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。

此方法应当与 drain() 方法一起使用:

stream.writelines(lines)
await stream.drain()
coroutine start_tls(sslcontext, \*, server_hostname=None, ssl_handshake_timeout=None)

Upgrade an existing stream-based connection to TLS.

Parameters:

  • sslcontext: a configured instance of SSLContext.

  • server_hostname: sets or overrides the host name that the target server’s certificate will be matched against.

  • ssl_handshake_timeout is the time in seconds to wait for the TLS handshake to complete before aborting the connection. 60.0 seconds if None (default).

  • 3.11 新版功能.

    async def tcp_echo_client(message): reader, writer = await asyncio.open_connection( '127.0.0.1', 8888) print(f'Send: {message!r}') writer.write(message.encode()) await writer.drain() data = await reader.read(100) print(f'Received: {data.decode()!r}') print('Close the connection') writer.close() asyncio.run(tcp_echo_client('Hello World!'))

    使用低层级 loop.create_connection() 方法的 TCP 回显客户端协议 示例。

    使用流的 TCP 回显服务器

    TCP 回显服务器使用 asyncio.start_server() 函数:

    import asyncio
    async def handle_echo(reader, writer):
        data = await reader.read(100)
        message = data.decode()
        addr = writer.get_extra_info('peername')
        print(f"Received {message!r} from {addr!r}")
        print(f"Send: {message!r}")
        writer.write(data)
        await writer.drain()
        print("Close the connection")
        writer.close()
    async def main():
        server = await asyncio.start_server(
            handle_echo, '127.0.0.1', 8888)
        addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
        print(f'Serving on {addrs}')
        async with server:
            await server.serve_forever()
    asyncio.run(main())
    

    使用 loop.create_server() 方法的 TCP 回显服务器协议 示例。

    获取 HTTP 标头

    查询命令行传入 URL 的 HTTP 标头的简单示例:

    import asyncio
    import urllib.parse
    import sys
    async def print_http_headers(url):
        url = urllib.parse.urlsplit(url)
        if url.scheme == 'https':
            reader, writer = await asyncio.open_connection(
                url.hostname, 443, ssl=True)
        else:
            reader, writer = await asyncio.open_connection(
                url.hostname, 80)
        query = (
            f"HEAD {url.path or '/'} HTTP/1.0\r\n"
            f"Host: {url.hostname}\r\n"
            f"\r\n"
        writer.write(query.encode('latin-1'))
        while True:
            line = await reader.readline()
            if not line:
                break
            line = line.decode('latin1').rstrip()
            if line:
                print(f'HTTP header> {line}')
        # Ignore the body, close the socket
        writer.close()
    url = sys.argv[1]
    asyncio.run(print_http_headers(url))
    
    python example.py http://example.com/path/page.html
    

    或使用 HTTPS:

    python example.py https://example.com/path/page.html
    async def wait_for_data():
        # Get a reference to the current event loop because
        # we want to access low-level APIs.
        loop = asyncio.get_running_loop()
        # Create a pair of connected sockets.
        rsock, wsock = socket.socketpair()
        # Register the open socket to wait for data.
        reader, writer = await asyncio.open_connection(sock=rsock)
        # Simulate the reception of data from the network
        loop.call_soon(wsock.send, 'abc'.encode())
        # Wait for data
        data = await reader.read(100)
        # Got data, we are done: close the socket
        print("Received:", data.decode())
        writer.close()
        # Close the second socket
        wsock.close()
    asyncio.run(wait_for_data())
    

    使用低层级协议以及 loop.create_connection() 方法的 注册一个打开的套接字以等待使用协议的数据 示例。

    使用低层级的 loop.add_reader() 方法来监视文件描述符的 监视文件描述符以读取事件 示例。

    This page is licensed under the Python Software Foundation License Version 2. Examples, recipes, and other code in the documentation are additionally licensed under the Zero Clause BSD License. See History and License for more information.
    The Python Software Foundation is a non-profit corporation. Please donate. 最后更新于 10月 12, 2022. Found a bug?