Запуск сопрограмм в другом, заданном цикле событий
Синтаксис
:
import asyncio
await asyncio.run_coroutine_threadsafe(coro, loop)
Параметры
:
Возвращаемое значение
:
Описание
:
Функция
run_coroutine_threadsafe()
модуля
asyncio
отправляет сопрограмму
coro
в заданный цикл событий
loop
. Функция поточно-ориентирована.
Возвращает объект
concurrent.futures.Future()
, результаты которого можно дождаться и получить с помощью метода
future.result()
.
Функция
asyncio.run_coroutine_threadsafe()
предназначена для вызова сопрограмм в потоке, отличным от того, в котором запущен основной цикл событий. Для этой цели необходимо явное создание отдельного потока.
# Создать сопрограмму
coro = asyncio.sleep(1, result=3)
# Отправить сопрограмму в заданный цикл
future = asyncio.run_coroutine_threadsafe(coro, loop)
# Ждать результата с необязательным аргументом тайм-аута
assert future.result(timeout) == 3
Если в
сопрограмме
возникает исключение, то будет уведомлено возвращенное
concurrent.futures.Future
. Функцию
asyncio.run_coroutine_threadsafe()
также можно использовать для отмены задачи в цикле событий:
try:
result = future.result(timeout)
except asyncio.TimeoutError:
print('Сопрограмма заняла много времени, отменив задачу...')
future.cancel()
except Exception as exc:
print(f'Сопрограмма вызвала исключение: {exc:!r}')
else:
print(f'Сопрограмма вернула результат: {result:!r}')
В отличие от других функций модуля
asyncio
, эта функция требует явной передачи аргумента
цикла событий
loop
.
При помощи этой функции можно запустить сопрограмму в цикле событий, созданным в другом потоке, как это делает функция
asyncio.to_thread()
, доступная с версии Python 3.9. Только для рассматриваемой функции необходимо явное создание потока.
Запустить сопрограмму в отдельном потоке можно так же методом цикла событий
loop.run_in_executor()
при помощи низкоуровнего API.
Пример запуска асинхронной задачи в заданном цикле событий.
В примере функция
worker()
запускается явно в задаче в текущем цикле событий и отправляется при помощи функции
asyncio.run_coroutine_threadsafe()
в новый цикл событий, созданный в отдельном потоке.
import asyncio, threading
async def worker(name, delay):
Асинхронная функция, которую будем запускать
в основном и отдельном потоке
# получаем имя потока
th_name = threading.current_thread().name
print(f'Start {name}; ожидание {delay}; поток: {th_name}')
res = await asyncio.sleep(delay, result=delay)
print(f'Done {name}; ожидание {delay}')
return name, res
async def main(new_loop):
# список с будущими результатами
results = []
# запускаем сопрограмму worker('Thread', 1) в цикле событий
# `new_loop` другого потока. Функция `run_coroutine_threadsafe()`
# проталкивает `worker()` в поток с циклом событий `new_loop`
future1 = asyncio.run_coroutine_threadsafe(worker('Thread', 1), new_loop)
# `future1` добавляем в список с результатами
results.append(future1)
# создаем асинхронную задачу в текущем цикле событий
task = asyncio.create_task(worker('Task', 1.5))
# ожидаем результат от асинхронной задачи
future2 = await task
# `future2` добавляем в список с результатами
results.append(future2)
print('\nРезультаты:')
# проходимся по списку с результатами `results`
for future in results:
# результаты потоков и цикла событий не
# совместимы, по этому извлекаем их по разному
if type(future) == tuple:
# результаты цикла событий
print(f'Задача {future[0]}; результат {future[1]}')
else:
# результаты потока
res = future.result()
print(f'Задача {res[0]}; результат {res[1]}')
# останавливаем цикл событий `new_loop`
# в отдельном потоке
new_loop.call_soon_threadsafe(new_loop.stop)
if __name__ == '__main__':
# получаем новый цикл событий
new_loop = asyncio.new_event_loop()
# создаем поток с запущенным новым циклом событий
thread = threading.Thread(target=new_loop.run_forever)
# запускаем поток
thread.start()
# Запускаем основной цикл событий и передаем
# в точку входа `main()` новый цикл событий `new_loop`
# для использования в `run_coroutine_threadsafe()`
asyncio.run(main(new_loop))
# Start Task; ожидание 1.5; поток: MainThread
# Start Thread; ожидание 1; поток: Thread-1
# Done Thread; ожидание 1