添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

这篇文章是《深入理解 asyncio》的第三篇,主要包含回调和在 asyncio 中执行同步代码。

可以给 Task (Future) 添加回调函数,等 Task 完成后就会自动调用这个 (些) 回调:

async def a():
    await asyncio.sleep(1)
    return 'A'
In : loop = asyncio.get_event_loop()
In : task = loop.create_task(a())
In : def callback(future):
...:     print(f'Result: {future.result()}')
...:
In : task.add_done_callback(callback)
In : await task
Result: A
Out: 'A'

可以看到在任务完成后执行了 callback 函数。我这里顺便解释一个问题,不知道有没有人注意到。

为什么之前一直推荐大家用asyncio.create_task,但是很多例子却用了loop.create_task

这是因为在 IPython 里面支持方便的使用 await 执行协程,但如果直接用asyncio.create_task会报「no running event loop」:

In : asyncio.create_task(a())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-2-2a742a8da161> in <module>
----> 1 asyncio.create_task(a())
/usr/local/lib/python3.7/asyncio/tasks.py in create_task(coro)
    322     Return a Task object.
    323     """
--> 324     loop = events.get_running_loop()
    325     return loop.create_task(coro)
RuntimeError: no running event loop

Eventloop 是在单进程里面的单线程中的,在 IPython 里面 await 的时候会把协程注册到一个线程的 Eventloop 上,但是 REPL 环境是另外一个线程,不是一个线程,所以会提示这个错误,即便asyncio.events._set_running_loop(loop)设置了 loop,任务可以创建倒是不能 await:因为 task 是在线程 X 的 Eventloop 上注册的,但是 await 时却到线程 Y 的 Eventloop 上去执行。这部分是 C 实现的,可以看延伸阅读链接 1。

所以现在你就会看到很多loop.create_task的代码片段,别担心,在代码项目里面都是用asyncio.create_task的,如果你非常想要在 IPython 里面使用asyncio.create_task也不是没有办法,可以这样做:

In : loop = asyncio.get_event_loop()
In : def loop_runner(coro):
...:     asyncio.events._set_running_loop(None)
...:     loop.run_until_complete(coro)
...:     asyncio.events._set_running_loop(loop)
...:
In : %autoawait loop_runner
In : asyncio.events._set_running_loop(loop)
In : task = asyncio.create_task(a())
In : await task
Out: 'A'

这样就可以啦。我解释下为什么:

  • IPython 里面能运行 await 是由于 loop_runner 函数,这个函数能运行协程 (延伸阅读链接 2),默认的效果大概是 asyncio.get_event_loop ().run_until_complete (coro)。为了让 asyncio.create_task 正常运行我定义了新的 loop_runner
  • 通过 autoawait 这个 magic 函数就可以重新设置 loop_runner
  • 上面的报错是「no running event loop」,所以通过 events._set_running_loop (loop) 设置一个正在运行的 loop,但是在默认的 loop_runner 中也无法运行,会报「Cannot run the event loop while another loop is running」,所以重置 await 里面那个 running 的 loop,运行结束再设置回去。
  • 如果你觉得有必要,可以在 IPython 配置文件中设置这个 loop_runner 到c.InteractiveShell.loop_runner上~

    好,我们说回来,add_done_callback方法也是支持参数的,但是需要用到functools.partial

    def callback2(future, n):
        print(f'Result: {future.result()}, N: {n}')
    In : task = loop.create_task(a())
    In : task.add_done_callback(partial(callback2, n=1))
    In : await task
    Result: A, N: 1
    Out: 'A'
    

    asyncio 提供了 3 个按需回调的方法,都在 Eventloop 对象上,而且也支持参数:

    call_soon

    在下一次事件循环中被回调,回调是按其注册顺序被调用的:

    def mark_done(future, result):
        print(f'Set to: {result}')
        future.set_result(result)
    async def b1():
        loop = asyncio.get_event_loop()
        fut = asyncio.Future()
        loop.call_soon(mark_done, fut, 'the result')
        loop.call_soon(partial(print, 'Hello', flush=True))
        loop.call_soon(partial(print, 'Greeting', flush=True))
        print(f'Done: {fut.done()}')
        await asyncio.sleep(0)
        print(f'Done: {fut.done()}, Result: {fut.result()}')
    In : await b1()
    Done: False
    Set to: the result
    Hello
    Greeting
    Done: True, Result: the result
    

    这个例子输出的比较复杂,我挨个分析:

  • call_soon 可以用来设置任务的结果:在 mark_done 里面设置
  • 通过 2 个 print 可以感受到 call_soon 支持参数。
  • 最重要的就是输出部分了,首先 fut.done () 的结果是 False,因为还没到下个事件循环,sleep (0) 就可以切到下次循环,这样就会调用三个 call_soon 回调,最后再看 fut.done () 的结果就是 True,而且 < code>fut.result () 可以拿到之前在 mark_done 设置的值了
  • call_later

    安排回调在给定的时间 (单位秒) 后执行:

    async def b2():
        loop = asyncio.get_event_loop()
        fut = asyncio.Future()
        loop.call_later(2, mark_done, fut, 'the result')
        loop.call_later(1, partial(print, 'Hello'))
        loop.call_later(1, partial(print, 'Greeting'))
        print(f'Done: {fut.done()}')
        await asyncio.sleep(2)
        print(f'Done: {fut.done()}, Result: {fut.result()}')
    In : await b2()
    Done: False
    Hello
    Greeting
    Set to: the result
    Done: True, Result: the result
    

    这次要注意 3 个回调的延迟时间时间要<=sleep 的,要不然还没来及的回调程序就结束了

    call_at

    安排回调在给定的时间执行,注意这个时间要基于loop.time()获取当前时间:

    async def b3():
        loop = asyncio.get_event_loop()
        now = loop.time()
        fut = asyncio.Future()
        loop.call_at(now + 2, mark_done, fut, 'the result')
        loop.call_at(now + 1, partial(print, 'Hello', flush=True))
        loop.call_at(now + 1, partial(print, 'Greeting', flush=True))
        print(f'Done: {fut.done()}')
        await asyncio.sleep(2)
        print(f'Done: {fut.done()}, Result: {fut.result()}')
    In : await b3()
    Done: False
    Hello
    Greeting
    Set to: the result
    Done: True, Result: the result
    

    前面的代码都是异步的,就如 sleep,需要用asyncio.sleep而不是阻塞的time.sleep,如果有同步逻辑,怎么;利用 asyncio 实现并发呢?答案是用run_in_executor。在一开始我说过开发者创建 Future 对象情况很少,主要是用run_in_executor,就是让同步函数在一个执行器 (executor) 里面运行:

    def a():
        time.sleep(1)
        return 'A'
    async def b():
        await asyncio.sleep(1)
        return 'B'
    def show_perf(func):
        print('*' * 20)
        start = time.perf_counter()
        asyncio.run(func())
        print(f'{func.__name__} Cost: {time.perf_counter() - start}')
    async def c1():
        loop = asyncio.get_running_loop()
        await asyncio.gather(
            loop.run_in_executor(None, a),
    In : show_perf(c1)
    ********************
    c1 Cost: 1.0027242230000866
    

    可以看到用run_into_executor可以把同步函数逻辑转化成一个协程,且实现了并发。这里要注意细节,就是函数 a 是普通函数,不能写成协程,下面的定义是错误的,不能实现并发:

    async def a():
        time.sleep(1)
        return 'A'
    

    因为 a 里面没有异步代码,就不要用async def来定义。需要把这种逻辑用loop.run_in_executor封装到协程:

    async def c():
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(None, a)
    

    大家理解了吧?

    loop.run_in_executor(None, a)这里面第一个参数是要传递concurrent.futures.Executor实例的,传递 None 会选择默认的 executor:

    In : loop._default_executor
    Out: <concurrent.futures.thread.ThreadPoolExecutor at 0x112b60e80>
    

    当然我们还可以用进程池,这次换个常用的文件读写例子,并且用:

    async def c3():
        loop = asyncio.get_running_loop()
        with concurrent.futures.ProcessPoolExecutor() as e:
            print(await asyncio.gather(
                loop.run_in_executor(e, a),
    In : show_perf(c3)
    ********************
    ['A', 'B']
    c3 Cost: 1.0218078890000015
    

    上一个小节用的run_in_executor就如它方法的名字所示,把协程放到了一个执行器里面,可以在一个线程池,也可以在一个进程池。另外还可以使用run_coroutine_threadsafe在其他线程执行协程(这是线程安全的):

    def start_loop(loop):
        asyncio.set_event_loop(loop)
        loop.run_forever()
    def shutdown(loop):
        loop.stop()
    async def b1():
        new_loop = asyncio.new_event_loop()
        t = Thread(target=start_loop, args=(new_loop,))
        t.start()
        future = asyncio.run_coroutine_threadsafe(a(), new_loop)
        print(future)
        print(f'Result: {future.result(timeout=2)}')
        new_loop.call_soon_threadsafe(partial(shutdown, new_loop))
    In : await b1()
    <Future at 0x107edf4e0 state=pending>
    Result: A
    

    这里面有几个细节要注意:

  • 协程应该从另一个线程中调用,而非事件循环运行所在线程,所以用 asyncio.new_event_loop () 新建一个事件循环
  • 在执行协程前要确保新创建的事件循环是运行着的,所以需要用 start_loop 之类的方式启动循环
  • 接着就可以用 asyncio.run_coroutine_threadsafe 执行协程 a 了,它返回了一个 Future 对象
  • 可以通过输出感受到 future 一开始是 pending 的,因为协程 a 里面会 sleep 1 秒才返回结果
  • future.result (timeout=2) 就可以获得结果,设置 timeout 的值要大于 a 协程执行时间,要不然会抛出 TimeoutError
  • 一开始我们创建的新的事件循环跑在一个线程里面,由于 loop.run_forever 会阻塞程序关闭,所以需要结束时杀掉线程,所以用 call_soon_threadsafe 回调函数 shutdown 去停止事件循环
  • 这里再说一下call_soon_threadsafe,看名字就知道它是线程安全版本的call_soon,其实就是在另外一个线程里面调度回调。BTW, 其实asyncio.run_coroutine_threadsafe底层也是用的它。

    本文代码可以在 mp 项目 找到

  • https://github.com/python/cpython/blob/3.7/Modules/_asynciomodule.c#L286
  • https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
  • https://docs.python.org/3.8/library/asyncio-task.html
  •