import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
参见下面的 Examples 部分。
Stream 函数
下面的高级 asyncio 函数可以用来创建和处理流:
coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)
建立网络连接并返回一对 (reader, writer)
对象。
返回的 reader 和 writer 对象是 StreamReader
和 StreamWriter
类的实例。
limit 确定返回的 StreamReader
实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。
其余的参数直接传递到 loop.create_connection()
。
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call its
close()
method.
在 3.7 版更改: Added the ssl_handshake_timeout parameter.
3.8 新版功能: Added happy_eyeballs_delay and interleave parameters.
在 3.10 版更改: Removed the loop parameter.
coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)
启动套接字服务。
当一个新的客户端连接被建立时,回调函数 client_connected_cb 会被调用。该函数会接收到一对参数 (reader, writer)
,reader是类 StreamReader
的实例,而writer是类 StreamWriter
的实例。
client_connected_cb 即可以是普通的可调用对象也可以是一个 协程函数; 如果它是一个协程函数,它将自动作为 Task
被调度。
limit 确定返回的 StreamReader
实例使用的缓冲区大小限制。默认情况下,limit 设置为 64 KiB 。
余下的参数将会直接传递给 loop.create_server()
.
The sock argument transfers ownership of the socket to the
server created. To close the socket, call the server’s
close()
method.
在 3.7 版更改: Added the ssl_handshake_timeout and start_serving parameters.
在 3.10 版更改: Removed the loop parameter.
Unix 套接字
coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)
建立一个 Unix 套接字连接并返回 (reader, writer)
这对返回值。
与 open_connection()
相似,但是是在 Unix 套接字上的操作。
请看文档 loop.create_unix_connection()
.
The sock argument transfers ownership of the socket to the
StreamWriter
created. To close the socket, call its
close()
method.
可用性: Unix。
在 3.7 版更改: Added the ssl_handshake_timeout parameter.
The path parameter can now be a path-like object
在 3.10 版更改: Removed the loop parameter.
coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)
启动一个 Unix 套接字服务。
与 start_server()
相似,但是是在 Unix 套接字上的操作。
请看文档 loop.create_unix_server()
.
The sock argument transfers ownership of the socket to the
server created. To close the socket, call the server’s
close()
method.
可用性: Unix。
在 3.7 版更改: Added the ssl_handshake_timeout and start_serving parameters.
The path parameter can now be a path-like object.
在 3.10 版更改: Removed the loop parameter.
class asyncio.StreamReader
这个类表示一个读取器对象,该对象提供api以便于从IO流中读取数据。
不推荐直接实例化 StreamReader 对象,建议使用 open_connection()
和 start_server()
来获取 StreamReader 实例。
coroutine read(n=- 1)
至多读取 n 个byte。 如果没有设置 n , 则自动置为 -1
, -1时表示读至 EOF 并返回所有读取的byte。
如果读到EOF,且内部缓冲区为空,则返回一个空的 bytes
对象。
coroutine readuntil(separator=b'\n')
从流中读取数据直至遇到 separator
成功后,数据和指定的separator将从内部缓冲区中删除(或者说被消费掉)。返回的数据将包括在末尾的指定separator。
如果读取的数据量超过了配置的流限制,将引发 LimitOverrunError
异常,数据将留在内部缓冲区中并可以再次读取。
如果在找到完整的separator之前到达EOF,则会引发 IncompleteReadError
异常,并重置内部缓冲区。 IncompleteReadError.partial
属性可能包含指定separator的一部分。
3.5.2 新版功能.
class asyncio.StreamWriter
这个类表示一个写入器对象,该对象提供api以便于写数据至IO流中。
不建议直接实例化 StreamWriter;而应改用 open_connection()
和 start_server()
。
write(data)
此方法会尝试立即将 data 写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
此方法应当与 drain()
方法一起使用:
stream.write(data)
await stream.drain()
writelines(data)
此方法会立即尝试将一个字节串列表(或任何可迭代对象)写入到下层的套接字。 如果写入失败,数据会被排入内部写缓冲队列直到可以被发送。
此方法应当与 drain()
方法一起使用:
stream.writelines(lines)
await stream.drain()
coroutine start_tls(sslcontext, \*, server_hostname=None, ssl_handshake_timeout=None)
Upgrade an existing stream-based connection to TLS.
Parameters:
sslcontext: a configured instance of SSLContext
.
server_hostname: sets or overrides the host name that the target
server’s certificate will be matched against.
ssl_handshake_timeout is the time in seconds to wait for the TLS
handshake to complete before aborting the connection. 60.0
seconds
if None
(default).
3.11 新版功能.
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
使用低层级 loop.create_connection()
方法的 TCP 回显客户端协议 示例。
使用流的 TCP 回显服务器
TCP 回显服务器使用 asyncio.start_server()
函数:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
使用 loop.create_server()
方法的 TCP 回显服务器协议 示例。
获取 HTTP 标头
查询命令行传入 URL 的 HTTP 标头的简单示例:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
python example.py http://example.com/path/page.html
或使用 HTTPS:
python example.py https://example.com/path/page.html
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
使用低层级协议以及 loop.create_connection()
方法的 注册一个打开的套接字以等待使用协议的数据 示例。
使用低层级的 loop.add_reader()
方法来监视文件描述符的 监视文件描述符以读取事件 示例。
This page is licensed under the Python Software Foundation License Version 2.
Examples, recipes, and other code in the documentation are additionally licensed under the Zero Clause BSD License.
See History and License for more information.
The Python Software Foundation is a non-profit corporation.
Please donate.
最后更新于 10月 12, 2022.
Found a bug?