def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())
Executor
子类使用最多 max_workers 个线程的线程池来异步执行调用。
所有排入 ThreadPoolExecutor
的队列的线程将在解释器退出之前被合并。 请注意执行此操作的退出处理句柄会在在任何使用 atexit
添加的退出处理句柄 之前 被执行。 这意味着主线程中的异常必须被捕获和处理以便向线程发出信号使其能够优雅地退出。 由于这个原理,建议不要将 ThreadPoolExecutor
用于长期运行的任务。
initializer 是在每个工作者线程开始处调用的一个可选可调用对象。 initargs 是传递给初始化器的元组参数。任何向池提交更多工作的尝试, initializer 都将引发一个异常,当前所有等待的工作都会引发一个 BrokenThreadPool
。
在 3.5 版更改: 如果 max_workers 为 None
或没有指定,将默认为机器处理器的个数,假如 ThreadPoolExecutor
侧重于I/O操作而不是CPU运算,那么可以乘以 5
,同时工作线程的数量可以比 ProcessPoolExecutor
的数量高。
3.6 新版功能: 添加 thread_name_prefix 参数允许用户控制由线程池创建的 threading.Thread
工作线程名称以方便调试。
在 3.7 版更改: 加入 initializer 和*initargs* 参数。
在 3.8 版更改: max_workers 的默认值已改为 min(32, os.cpu_count() + 4)
。 这个默认值会保留至少 5 个工作线程用于 I/O 密集型任务。 对于那些释放了 GIL 的 CPU 密集型任务,它最多会使用 32 个 CPU 核心。这样能够避免在多核机器上不知不觉地使用大量资源。
现在 ThreadPoolExecutor 在启动 max_workers 个工作线程之前也会重用空闲的工作线程。
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor
ProcessPoolExecutor
类是 Executor
的子类,它使用进程池来异步地执行调用。 ProcessPoolExecutor
会使用 multiprocessing
模块,这允许它绕过 全局解释器锁 但也意味着只可以处理和返回可封存的对象。
__main__
模块必须可以被工作者子进程导入。这意味着 ProcessPoolExecutor
不可以工作在交互式解释器中。
从可调用对象中调用 Executor
或 Future
的方法提交给 ProcessPoolExecutor
会导致死锁。
class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)
一个 Executor
子类使用最多由 max_workers 个进程组成的进程池异步执行调用。 如果 max_workers 是 None
或未给出,则默认为机器上的处理器数量。如果 max_workers 小于或等于 0
,则会引发 ValueError
。在 Windows 系统中,max_workers 必须小于或等于 61
,否则会引发 ValueError
。如果 max_workers 为 None
,则默认情况下最大为 61
,即使有更多的处理器可用。mp_context 可以是多进程上下文,也可以是 None。它将用于启动工作者。如果 mp_context 为``None`` 或未给出,则使用默认的多进程上下文。
initializer 是一个可选的可调用对象,它会在每个工作进程启动时被调用;initargs 是传给 initializer 的参数元组。 如果 initializer 引发了异常,则所有当前在等待的任务以及任何向进程池提交更多任务的尝试都将引发 BrokenProcessPool
。
max_tasks_per_child 是指定单个进程在其退出并替换为新工作进程之前可以执行的最大任务数量的可选参数。 在默认情况下 max_tasks_per_child 为 None
表示工作进程将存活与进程池一样长的时间。 当指定了最大数量时,则如果不存在 mp_context 形参则将默认使用 "spawn" 多进程启动方法。 此特性不能兼容 "fork" 启动方法。
在 3.3 版更改: 如果其中一个工作进程被突然终止,BrokenProcessPool
就会马上触发。 可预计的行为没有定义,但执行器上的操作或它的 future 对象会被冻结或死锁。
在 3.7 版更改: 添加 mp_context 参数允许用户控制由进程池创建给工作者进程的开始方法 。
加入 initializer 和*initargs* 参数。
在 3.11 版更改: 增加了 max_tasks_per_child 参数以允许用户控制进程池中工作进程的生命期。
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
result(timeout=None)
返回调用所返回的值。 如果调用尚未完成则此方法将等待至多 timeout 秒。 如果调用在 timeout 秒内仍未完成,则将引发 TimeoutError
。 timeout 可以为整数或浮点数。 如果 timeout 未指定或为 None
,则不限制等待时间。
如果 future 在完成前被取消则 CancelledError
将被触发。
如果调用引发了一个异常,这个方法也会引发同样的异常。
exception(timeout=None)
返回调用所引发的异常。 如果调用尚未完成则此方法将等待至多 timeout 秒。 如果调用在 timeout 秒内仍未完成,则将引发 TimeoutError
。 timeout 可以为整数或浮点数。 如果 timeout 未指定或为 None
,则不限制等待时间。
如果 future 在完成前被取消则 CancelledError
将被触发。
如果调用正常完成那么返回 None
。
add_done_callback(fn)
附加可调用 fn 到 future 对象。当 future 对象被取消或完成运行时,将会调用 fn,而这个 future 对象将作为它唯一的参数。
加入的可调用对象总被属于添加它们的进程中的线程按加入的顺序调用。如果可调用对象引发一个 Exception
子类,它会被记录下来并被忽略掉。如果可调用对象引发一个 BaseException
子类,这个行为没有定义。
如果 future 对象已经完成或已取消,fn 会被立即调用。
下面这些 Future
方法用于单元测试和 Executor
实现。
set_running_or_notify_cancel()
这个方法只可以在执行关联 Future
工作之前由 Executor
实现调用或由单测试调用。
如果此方法返回 False
则 Future
已被取消,即 Future.cancel()
已被调用并返回 True
。 任何等待 Future
完成 (即通过 as_completed()
或 wait()
) 的线程将被唤醒。
如果此方法返回 True
则 Future
没有被取消并已被置为正在运行的状态,即对 Future.running()
的调用将返回 True
。
这个方法只可以被调用一次并且不能在调用 Future.set_result()
或 Future.set_exception()
之后再调用。
set_exception(exception)
设置 Future
关联工作的结果给 Exception
exception 。
这个方法只可以由 Executor
实现和单元测试使用。
在 3.8 版更改: 如果 Future
已经完成则此方法会引发 concurrent.futures.InvalidStateError
。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)
等待由 fs 指定的 Future
实例(可能由不同的 Executor
实例创建)完成。 重复传给 fs 的 future 会被移除并将只返回一次。 返回一个由集合组成的具名 2 元组。 第一个集合的名称为 done
,包含在等待完成之前已完成的 future(包括正常结束或被取消的 future)。 第二个集合的名称为 not_done
,包含未完成的 future(包括挂起的或正在运行的 future)。
timeout 可以用来控制返回前最大的等待秒数。 timeout 可以为 int 或 float 类型。 如果 timeout 未指定或为 None
,则不限制等待时间。
return_when 指定此函数应在何时返回。它必须为以下常数之一:
concurrent.futures.as_completed(fs, timeout=None)
返回一个包含 fs 所给出的 Future
实例(可能由不同的 Executor
实例创建)的迭代器,这些实例会在完成时产生 future 对象(包括正常结束或被取消的 future 对象)。 任何由 fs 给出的重复的 future 对象将只被返回一次。 任何在 as_completed()
被调用之前完成的 future 对象将优先被产生。 如果 __next__()
被调用并且在对 as_completed()
的原始调用 timeout 秒之后其结果仍不可用则所返回的迭代器将引发 TimeoutError
。 timeout 可以为整数或浮点数。 如果 timeout 未指定或为 None
,则不限制等待时间。
PEP 3148 -- future 对象 - 异步执行指令。该提案描述了Python标准库中包含的这个特性。
exception concurrent.futures.TimeoutError
TimeoutError
的一个已被弃用的别名,会在 future 操作超出了给定的时限时被引发。
在 3.11 版更改: 这个类是 TimeoutError
的别名。
exception concurrent.futures.BrokenExecutor
当执行器被某些原因中断而且不能用来提交或执行新任务时就会被引发派生于 RuntimeError
的异常类。
3.7 新版功能.
exception concurrent.futures.thread.BrokenThreadPool
当 ThreadPoolExecutor
中的其中一个工作者初始化失败时会引发派生于 BrokenExecutor
的异常类。
3.7 新版功能.
exception concurrent.futures.process.BrokenProcessPool
当 ThreadPoolExecutor
中的其中一个工作者不完整终止时(比如,被外部杀死)会引发派生于 BrokenExecutor
( 原名 RuntimeError
) 的异常类。
3.3 新版功能.
This page is licensed under the Python Software Foundation License Version 2.
Examples, recipes, and other code in the documentation are additionally licensed under the Zero Clause BSD License.
See History and License for more information.
The Python Software Foundation is a non-profit corporation.
Please donate.
最后更新于 10月 04, 2023.
Found a bug?