async def a ():
await asyncio . sleep ( 1 )
return 'A'
In : loop = asyncio . get_event_loop ()
In : task = loop . create_task ( a ())
In : def callback ( future ):
... : print ( f 'Result: { future . result () } ' )
... :
In : task . add_done_callback ( callback )
In : await task
Result : A
Out : 'A'
可以看到在任务完成后执行了 callback 函数。我这里顺便解释一个问题,不知道有没有人注意到。
为什么之前一直推荐大家用asyncio.create_task
,但是很多例子却用了loop.create_task
?
这是因为在 IPython 里面支持方便的使用 await 执行协程,但如果直接用asyncio.create_task
会报「no running event loop」:
In : asyncio . create_task ( a ())
---------------------------------------------------------------------------
RuntimeError Traceback ( most recent call last )
< ipython - input - 2 - 2 a742a8da161 > in < module >
----> 1 asyncio . create_task ( a ())
/ usr / local / lib / python3 .7 / asyncio / tasks . py in create_task ( coro )
322 Return a Task object .
323 """
--> 324 loop = events.get_running_loop()
325 return loop.create_task(coro)
RuntimeError: no running event loop
Eventloop 是在单进程里面的单线程中的,在 IPython 里面 await 的时候会把协程注册到一个线程的 Eventloop 上,但是 REPL 环境是另外一个线程,不是一个线程,所以会提示这个错误,即便asyncio.events._set_running_loop(loop)
设置了 loop,任务可以创建倒是不能 await:因为 task 是在线程 X 的 Eventloop 上注册的,但是 await 时却到线程 Y 的 Eventloop 上去执行。这部分是 C 实现的,可以看延伸阅读链接 1。
所以现在你就会看到很多loop.create_task
的代码片段,别担心,在代码项目里面都是用asyncio.create_task
的,如果你非常想要在 IPython 里面使用asyncio.create_task
也不是没有办法,可以这样做:
In : loop = asyncio . get_event_loop ()
In : def loop_runner ( coro ):
... : asyncio . events . _set_running_loop ( None )
... : loop . run_until_complete ( coro )
... : asyncio . events . _set_running_loop ( loop )
... :
In : % autoawait loop_runner
In : asyncio . events . _set_running_loop ( loop )
In : task = asyncio . create_task ( a ())
In : await task
Out : 'A'
这样就可以啦。我解释下为什么:
IPython 里面能运行 await 是由于 loop_runner 函数,这个函数能运行协程 (延伸阅读链接 2),默认的效果大概是 asyncio.get_event_loop ().run_until_complete (coro)
。为了让 asyncio.create_task
正常运行我定义了新的 loop_runner
通过 autoawait 这个 magic 函数就可以重新设置 loop_runner
上面的报错是「no running event loop」,所以通过 events._set_running_loop (loop)
设置一个正在运行的 loop,但是在默认的 loop_runner 中也无法运行,会报「Cannot run the event loop while another loop is running」,所以重置 await 里面那个 running 的 loop,运行结束再设置回去。
如果你觉得有必要,可以在 IPython 配置文件中设置这个 loop_runner 到c.InteractiveShell.loop_runner
上~
好,我们说回来,add_done_callback
方法也是支持参数的,但是需要用到functools.partial
:
def callback2 ( future , n ):
print ( f 'Result: { future . result () } , N: { n } ' )
In : task = loop . create_task ( a ())
In : task . add_done_callback ( partial ( callback2 , n = 1 ))
In : await task
Result : A , N : 1
Out : 'A'
asyncio 提供了 3 个按需回调的方法,都在 Eventloop 对象上,而且也支持参数:
call_soon
在下一次事件循环中被回调,回调是按其注册顺序被调用的:
def mark_done ( future , result ):
print ( f 'Set to: { result } ' )
future . set_result ( result )
async def b1 ():
loop = asyncio . get_event_loop ()
fut = asyncio . Future ()
loop . call_soon ( mark_done , fut , 'the result' )
loop . call_soon ( partial ( print , 'Hello' , flush = True ))
loop . call_soon ( partial ( print , 'Greeting' , flush = True ))
print ( f 'Done: { fut . done () } ' )
await asyncio . sleep ( 0 )
print ( f 'Done: { fut . done () } , Result: { fut . result () } ' )
In : await b1 ()
Done : False
Set to : the result
Hello
Greeting
Done : True , Result : the result
这个例子输出的比较复杂,我挨个分析:
call_soon
可以用来设置任务的结果:在 mark_done
里面设置
通过 2 个 print 可以感受到 call_soon
支持参数。
最重要的就是输出部分了,首先 fut.done () 的结果是 False,因为还没到下个事件循环,sleep (0) 就可以切到下次循环,这样就会调用三个 call_soon
回调,最后再看 fut.done () 的结果就是 True,而且 < code>fut.result () 可以拿到之前在 mark_done
设置的值了
call_later
安排回调在给定的时间 (单位秒) 后执行:
async def b2 ():
loop = asyncio . get_event_loop ()
fut = asyncio . Future ()
loop . call_later ( 2 , mark_done , fut , 'the result' )
loop . call_later ( 1 , partial ( print , 'Hello' ))
loop . call_later ( 1 , partial ( print , 'Greeting' ))
print ( f 'Done: { fut . done () } ' )
await asyncio . sleep ( 2 )
print ( f 'Done: { fut . done () } , Result: { fut . result () } ' )
In : await b2 ()
Done : False
Hello
Greeting
Set to : the result
Done : True , Result : the result
这次要注意 3 个回调的延迟时间时间要<=sleep 的,要不然还没来及的回调程序就结束了
call_at
安排回调在给定的时间执行,注意这个时间要基于loop.time()
获取当前时间:
async def b3 ():
loop = asyncio . get_event_loop ()
now = loop . time ()
fut = asyncio . Future ()
loop . call_at ( now + 2 , mark_done , fut , 'the result' )
loop . call_at ( now + 1 , partial ( print , 'Hello' , flush = True ))
loop . call_at ( now + 1 , partial ( print , 'Greeting' , flush = True ))
print ( f 'Done: { fut . done () } ' )
await asyncio . sleep ( 2 )
print ( f 'Done: { fut . done () } , Result: { fut . result () } ' )
In : await b3 ()
Done : False
Hello
Greeting
Set to : the result
Done : True , Result : the result
前面的代码都是异步的,就如 sleep,需要用asyncio.sleep
而不是阻塞的time.sleep
,如果有同步逻辑,怎么;利用 asyncio 实现并发呢?答案是用run_in_executor
。在一开始我说过开发者创建 Future 对象情况很少,主要是用run_in_executor
,就是让同步函数在一个执行器 (executor) 里面运行:
def a ():
time . sleep ( 1 )
return 'A'
async def b ():
await asyncio . sleep ( 1 )
return 'B'
def show_perf ( func ):
print ( '*' * 20 )
start = time . perf_counter ()
asyncio . run ( func ())
print ( f ' { func . __name__ } Cost: { time . perf_counter () - start } ' )
async def c1 ():
loop = asyncio . get_running_loop ()
await asyncio . gather (
loop . run_in_executor ( None , a ),
In : show_perf ( c1 )
********************
c1 Cost : 1.0027242230000866
可以看到用run_into_executor
可以把同步函数逻辑转化成一个协程,且实现了并发。这里要注意细节,就是函数 a 是普通函数,不能写成协程,下面的定义是错误的,不能实现并发:
async def a ():
time . sleep ( 1 )
return 'A'
因为 a 里面没有异步代码,就不要用async def
来定义。需要把这种逻辑用loop.run_in_executor
封装到协程:
async def c ():
loop = asyncio . get_running_loop ()
return await loop . run_in_executor ( None , a )
大家理解了吧?
loop.run_in_executor(None, a)
这里面第一个参数是要传递concurrent.futures.Executor
实例的,传递 None 会选择默认的 executor:
In : loop . _default_executor
Out : < concurrent . futures . thread . ThreadPoolExecutor at 0x112b60e80 >
当然我们还可以用进程池,这次换个常用的文件读写例子,并且用:
async def c3 ():
loop = asyncio . get_running_loop ()
with concurrent . futures . ProcessPoolExecutor () as e :
print ( await asyncio . gather (
loop . run_in_executor ( e , a ),
In : show_perf ( c3 )
********************
[ 'A' , 'B' ]
c3 Cost : 1.0218078890000015
上一个小节用的run_in_executor
就如它方法的名字所示,把协程放到了一个执行器里面,可以在一个线程池,也可以在一个进程池。另外还可以使用run_coroutine_threadsafe
在其他线程执行协程(这是线程安全的):
def start_loop ( loop ):
asyncio . set_event_loop ( loop )
loop . run_forever ()
def shutdown ( loop ):
loop . stop ()
async def b1 ():
new_loop = asyncio . new_event_loop ()
t = Thread ( target = start_loop , args = ( new_loop ,))
t . start ()
future = asyncio . run_coroutine_threadsafe ( a (), new_loop )
print ( future )
print ( f 'Result: { future . result ( timeout = 2 ) } ' )
new_loop . call_soon_threadsafe ( partial ( shutdown , new_loop ))
In : await b1 ()
< Future at 0x107edf4e0 state = pending >
Result : A
这里面有几个细节要注意:
协程应该从另一个线程中调用,而非事件循环运行所在线程,所以用 asyncio.new_event_loop ()
新建一个事件循环
在执行协程前要确保新创建的事件循环是运行着的,所以需要用 start_loop
之类的方式启动循环
接着就可以用 asyncio.run_coroutine_threadsafe
执行协程 a 了,它返回了一个 Future 对象
可以通过输出感受到 future 一开始是 pending 的,因为协程 a 里面会 sleep 1 秒才返回结果
用 future.result (timeout=2)
就可以获得结果,设置 timeout 的值要大于 a 协程执行时间,要不然会抛出 TimeoutError
一开始我们创建的新的事件循环跑在一个线程里面,由于 loop.run_forever
会阻塞程序关闭,所以需要结束时杀掉线程,所以用 call_soon_threadsafe
回调函数 shutdown
去停止事件循环
这里再说一下call_soon_threadsafe
,看名字就知道它是线程安全版本的call_soon
,其实就是在另外一个线程里面调度回调。BTW, 其实asyncio.run_coroutine_threadsafe
底层也是用的它。
本文代码可以在 mp 项目 找到
https://github.com/python/cpython/blob/3.7/Modules/_asynciomodule.c#L286
https://ipython.readthedocs.io/en/stable/interactive/autoawait.html
https://docs.python.org/3.8/library/asyncio-task.html