添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

Python 线程允许程序的不同部分同时运行,并可以简化设计。如果你对 Python 有一些经验,并且希望使用线程为程序加速,那么本文就是为你准备的!

什么是线程?

线程是一个独立的流,这意味着你的程序可以同时做两件事,但是,对于大多数Python程序,不同的线程实际上并不同时执行,它们只是看起来像是同时执行。

人们很容易认为线程是在程序上运行两个(或更多)不同的处理器,每个处理器同时执行一个独立的任务。这种看法大致正确,线程可能在不同的处理器上运行,但一个处理器一次只能运行一个线程。

要同时运行多个任务,不能用Python的标准方式实现,可以用不同的编程语言,或者多个进程实现,这样做的开发成本就高了。

由于用CPython实现了Python业务,线程可能不会加速所有任务,这是GIL(全称Global Interpreter Lock)的原因,一次只能运行一个Python线程。

如果某项任务需要花费大量时间等待外部事件,那么就可以应用多线程。如果是需要对CPU占用高并且花费很少时间等待外部事件,多线程可能枉费。

对于用Python编写并在标准CPython实现上运行的代码,这是正确的。如果你的线程是用C编写的,那么它们就能够释放GIL、并发运行。如果你在不同的Python实现上运行,也可以查看文档,了解它如何处理线程。

如果你正在运行一个标准的Python程序,只使用Python编写,并且有一个CPU受限的问题,那么你应该用多进程解决此问题。

将程序架构为使用线程也可以提高设计的清晰度。你将在下文中学习的大多数示例不一定会运行得更快,因为它们使用线程。在这些示例中使用线程有助于使设计更清晰、更易于推理。

所以,让我们停止谈论线程并开始使用它!

创建一个线程

现在你已经知道了什么是线程,让我们来学习如何制作线程。Python标准库提供了线程模块 threading ,它包含了你将在本文中看到的大部分内容。在这个模块中, Thread 是对线程的封装,提供了简单的实现接口。

要创建一个线程,需要创建 Thread 的实例,然后调用它的 .start() 方法:

import logging
import threading
import time
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    logging.info("Main : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main : before running thread")
    x.start()
    logging.info("Main : wait for the thread to finish")
    # x.join()
    logging.info("Main : all done")

如果你查看日志,可以看到在 main 部分正在创建和启动线程:

x = threading.Thread(target=thread_function, args=(1,))
x.start()

用函数 thread_function() arg(1,) 创建一个 Thread 实例。在本文中用整数作为线程的名称, threading.get_ident() 可以返回线程的名称,但可读性较差。

thread_function() 函数的作用不大,它只是记录一些日志消息,在这些消息之间加上 time.sleep()

当你执行此程序时,输出将如下所示:

$ ./single_thread.py
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done
Thread 1: finishing

你会注意到代码的 main 部分结束之后, Thread 才结束。后面会揭示这么做的原因。

守护线程

在计算机科学中, daemon 是在后台运行的程序。

Python的 threading 模块对 daemon 有更具体的含义。当程序退出时,守护线程会立即关闭。考虑这些定义的一种方法是将 daemon 视为在后台运行的线程,而不必担心关闭它。

如果程序中正在执行的 Threads 不是 daemons ,则程序将在终止之前等待这些线程完成。然而,如果 Threads daemons ,当程序退出时,它们就终止了。

让我们更仔细地看看上面程序的输出,最后两行是有点意思的。当运行这个程序时,在 __main__ 打印完 all done 后以及线程结束之前会暂停大约2秒。

这个暂停是Python等待非后台线程完成。当Python程序结束时,关闭操作是清除线程中的程序。

如果查看 threading 模块的源代码,你将看到 threading._shutdown() 方法,它会遍历所有正在运行的线程,并在每一个没有设置 daemon 标志的线程上调用 .join() 方法。

因此,程序在退出时会等待,因为线程本身正在sleep( time.sleep(2) )中。一旦完成并打印了消息, .join() 将返回,程序才可以退出。

通常,这是你想要的,但是我们还有其他的选择。让我们首先使用一个 daemon 线程来重复这个程序。你可以修改 Thread 实例化时的参数,添加 daemon=True :

x = threading.Thread(target=thread_function, args=(1,), daemon=True)

现在运行程序时,应看到以下输出:

$ ./daemon_thread.py
Main : before creating thread
Main : before running thread
Thread 1: starting
Main : wait for the thread to finish
Main : all done

与前面不同的是,前面所输出的最后一行在这里没有了。 thread_function() 没有执行完,它是一个 daemon 线程,所以当 _main__ 执行到达它的末尾时,程序结束,后台线程也就结束了。

线程实例的 .join() 方法

守护线程很方便,但是,如果要实现线程完全执行,而不是被迫退出,应该怎么办?现在让我们回到原始程序,看看注释掉的那一行:

# x.join()

要让一个线程等待另一个线程完成,可以调用 .join() 。取消对该行的注释,主线程将暂停并等待线程 x ,直到它运行结束。

你是否在程序中用守护线程或普通线程测试了这个问题?这并不重要。如果执行某个线程的 .join() 方法,该语句将一直等待,直到每个线程都完成。

使用多线程

到目前为止,示例代码只使用了两个线程:一个是主线程,另一个是以 threading.Thread 对象开始的线程。

通常,您会希望启动更多线程并让它们做一些有趣的工作。我们先来看看比复杂的方法,然后再看比较简单的方法。

启动多线程比较复杂的方法是你已经知道的:

import logging
import threading
import time
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    threads = list()
    for index in range(3):
        logging.info("Main : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()
    for index, thread in enumerate(threads):
        logging.info("Main : before joining thread %d.", index)
        thread.join()
        logging.info("Main : thread %d done", index)

这段代码使用与上面看到的相同机制来启动线程,创建一个 Thread 实例对象,然后调用 .start() 。程序中生成一个由 Thread 实例组成的列表,后面再调用每个实例 .join() 方法。

多次运行此代码可能会产生一些有趣的结果。下面是我的机器的输出示例:

$ ./multiple_threads.py
Main : create and start thread 0.
Thread 0: starting
Main : create and start thread 1.
Thread 1: starting
Main : create and start thread 2.
Thread 2: starting
Main : before joining thread 0.
Thread 2: finishing
Thread 1: finishing
Thread 0: finishing
Main : thread 0 done
Main : before joining thread 1.
Main : thread 1 done
Main : before joining thread 2.
Main : thread 2 done

如果仔细检查输出,你将看到所有三个线程都按照你可能期望的顺序开始,但在本例中,它们是按照相反的顺序完成的!多次运行将产生不同的排序,可以通过查找 Thread x: finishing 消息来了解每个线程何时完成。

线程的运行顺序由操作系统决定,很难预测,它可能(而且很可能)因运行而异,因此在设计使用线程的算法时需要注意这一点。

幸运的是,Python提供了几个模块,你稍后将看到这些模块用来帮助协调线程并使它们一起运行。在此之前,让我们看看如何更简单地管理一组线程。

使用ThreadPoolExecutor

有一种比上面看到的更容易启动多线程的方法,它被称为 ThreadPoolExecutor ,是标准库中的 concurrent.futures 的一员(从Python3.2开始)。

创建它的最简单方法是使用上下文管理器的 with 语句,用它实现对线程池的创建和销毁。

下面是为了使用 ThreadPoolExecutor 而重写的上一个示例中的 __main__ 部分代码:

import concurrent.futures
# [rest of code]
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

代码创建了一个 ThreadPoolExecutor 作为上下文管理器,告诉它需要在线程池中有多少个工作线程。然后它使用 .map() 遍历可迭代对象,在上面的例子中是 range(3) ,将每个可迭代对象传递给线程池中的一个线程。

with 语句块的尾部,默认会调用 ThreadPoolExecutor 的每个线程的 .join() 方法,建议你尽可能使用 ThreadPoolExecutor 作为上下文管理器,这样你就永远不会忘记对执行线程 .join()

注意:使用 ThreadPoolExecutor 可能会导致一些混乱的错误。

例如,如果调用不带参数的函数,但在 .map() 中传了参数,则线程应当抛出异常。

不幸的是, ThreadPoolExecutor 隐藏了该异常,并且(在上面的情况下)程序将在没有输出的情况下终止。一开始调试可能会很混乱。

运行正确的示例代码将生成如下输出:

$ ./executor.py
Thread 0: starting
Thread 1: starting
Thread 2: starting
Thread 1: finishing
Thread 0: finishing
Thread 2: finishing

同样,请注意 Thread 1 是在 Thread 0 之前完成的,线程执行顺序的调度是由操作系统完成的,所遵循的计划也不易理解。

竞态条件

在讨论Python线程的其他特性之前,让我们先讨论一下编写线程程序时遇到的一个更困难的问题:竞态条件。

一旦你了解了什么是竞态条件,并看到了正在发生的情况,然后就使用标准库提供的模块,以防止这些竞态条件的出现。

当两个或多个线程访问共享数据或资源时,可能会出现竞态情况。在本例中,你将创建一个每次都发生的大型竞态条件,但请注意,大多数它并不是很明显。示例中的情况通常很少发生,而且会产生令人困惑的结果。可以想象,因为竞态条件而引起的bug很难被发现。

幸运的是,在下述示例中竞态问题每次都会发生,你将详细地了解它以便解释发生了什么。

对于本例,将编写一个更新数据库的类。你不会真的有一个数据库:你只是要伪造它,因为这不是本文的重点。

FakeDatabase 类中有 .__init__() .update() 方法:

class FakeDatabase:
    def __init__(self):
        self.value = 0
    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)

FakeDatabase 中的属性 .value ,用于作为竞态条件中共享的数据。

.__init__() 中将 .value 值初始化为 0. ,到目前为止,一切正常。

.update() 看起来有点奇怪,它模拟从数据库中读取一个值,对其进行一些计算,然后将一个新值写回数据库。

所谓从数据库中读取,即将 .value 的值复制到本地变量。计算就是在原值上加1,然后 .sleep() 一小会儿。最后,它通过将本地值复制回 .value ,将值写回去。

下面是 FakeDatabase 的使用方法:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

程序中创建了两个 ThreadPoolExecutor ,然后对每个线程调用 .submit() ,告诉它们运行 database.update()

.submit() 有一个明显特征,它允许将位置参数和命名参数传给线程中运行的函数:

.submit(function, *args, **kwargs)

在上面的用法中, index 作为第一个也是唯一一个位置参数传给 database.update() 。你将在本文后面看到,可以用类似的方式传多个参数。

由于每个线程都运行 .update() ,而 .update() 会让 .value 的值加1,因此在最后打印时,你可能会希望 database.value 为2。但如果是这样的话,你就不会看这个例子了。如果运行上述代码,则输出如下:

$ ./racecond.py
Testing unlocked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing unlocked update. Ending value is 1.

你可能已经预料到这种情况会发生,但是让我们来看看实际情况的细节,因为这将使这个问题的解决方案更容易理解。

单线程

在用两个线程深入讨论这个问题之前,让我们先退一步,谈谈线程工作流程的一些细节。

我们不会在这里深入讨论所有的细节,因为这种全面深入的讨论现在并不重要。我们还将简化一些事情,这种做法虽然在技术上并不准确,但会让你对正在发生的事情有正确的认识。

当你告诉 ThreadPoolExecutor 运行每个线程时,也就是告诉它要运行哪个函数以及要传给它的参数: executor.submit(database.update, index)

其结果是线程池中的每个线程都将调用 database.update(index) 。注意, database __main__ 中创建的 FakeDatabase 实例对象,调用它的方法 .update()

每个线程都将引用同一个 FakeDatabase 的实例 database ,每个线程还将有一个唯一的值 index 。为了让上述过程更容易理解,请看下图:

image-25

当某线程开始运行 .update() 时,它有此方法的本地的数据,即 .update() 中的 local_copy 。这绝对是件好事,否则,在两个线程中运行同一个函数就会互相干扰了。这意味着该函数的所有作用域(或本地)变量对于线程来说都是安全的。

现在,你已经理解,如果使用单个线程和对 .update() 的单个调用来运行上面的程序会发生什么情况。

如果只运行一个线程,如下图所示,会一步一步地执行 .update() 。下图中,语句显示在上面,下面用图示方式演示了线程中的 local_value 和共享的 database.value 中的值的变化:

image-26

按照时间顺序,从上到下观察上面的示意图,从创建线程 Thread 1 开始,到 Thread 1 结束终止。

Thread 1 启动时, FakeDatabase.value 为零。方法中的第一行代码 local_copy=self.value 将0复制到局部变量。接下来,使用 local_copy+=1 语句增加 local_copy 的值。你可以看到 Thread 1 中的 .value 值为1。

然后,调用下一个 time.sleep() ,这将使当前线程暂停并允许其他线程运行。因为在这个例子中只有一个线程,所以这没有影响。

Thread 1 唤醒并继续时,它将新值从 local_copy 复制到 FakeDatabase.value ,然后线程完成。你可以看到 database.value 为1。

到目前为止,一切正常。你只运行了一次 .update() 并且将 FakeDatabase.value 递增为1。

两个线程

回到竞态条件,两个线程并行,但不是同时运行。每个线程都有自己的 local_copy ,并指向相同的 database ,正是这个共享数据库对象导致了这些问题。

程序还是从 Thread 1 执行 .update() 开始:

image-27

Thread 1 调用 time.sleep() 时,它允许另一个线程开始运行。这就是事情变得有趣的地方。

Thread 2 启动并执行相同的操作。它也将 database.value 复制到其私有的 local_copy ,而此时共享的 database.value 尚未更新:

image-28

Thread 1 进入睡眠状态时,共享的 database.value 仍然未被修改,还是0,而此时的 local_copy 的两个私有版本的值都为1。

Thread 1 现在醒来并保存其 local_copy 的值,然后线程终止,给 Thread 2 机会。 Thread 2 不知道在它睡眠时 Thread 1 运行并更新了 database.value 的值。 Thread 2 也将它的 local_copy 值存储到 database.value 中,并将其设置为1:

image-29

这两个线程交替访问一个共享对象,覆盖彼此的结果。当一个线程释放内存或在另一个线程完成访问之前关闭文件句柄时,可能会出现类似的竞态。

为什么这不是一个愚蠢的示例

上面的例子是刻意而为,目的是确保每次运行程序时都会发生竞态。因为操作系统可以在任何时候交换线程,所以在读取 x 的值之后,并且在写回递增的值之前,可以中断类似 x=x+1 的语句。

发生这种情况的原因细节非常有趣,但这篇文章的其余部分并不需要这些细节,所以可以跳过这个隐藏的部分。

既然你已经看到了运行过程中的竞态条件,让我们找出解决问题的方法!

使用锁实现同步

有很多方法可以避免或解决竞态。你不会在这里看到所有这些方法,但是有一些方法是经常使用的。让我们从 Lock 开始。

要解决上述竞态条件,需要找到一种方法,使得在代码的“读-修改-写”操作中一次只允许一个线程。最常见的方法是使用Python中名为 Lock 的方法。在其他的一些语言中,类似的被称为 Mutex Mutex 源于MUTual EXclusion,这正是 Lock 的作用。

Lock 像是通行证,一次只能有一个线程拥有 Lock ,任何其他想要 Lock 的线程都必须等到 Lock 的所有者放弃它。

执行此操作的基本函数是 .acquire() .release() 。线程将调用 my_lock.acquire() 来获取自己的锁。如果锁已经被其他线程所有,则将等待它被释放。这里有一点很重要,如果一个线程得到了锁,但尚未返回,你的程序将被卡住。你稍后会读到更多关于这方面的内容。

幸运的是,Python的 Lock 也将作为上下文管理器运行,因此你可以在一个带有with的语句中使用它,并且当with代码块由于任何原因退出时,锁也会自动释放。

让我们看看添加了锁的 FakeDatabase ,其所调用函数保持不变:

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()
    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)
        with self._lock:
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)

除了添加一堆调试日志以便更清楚地看到锁操作之外,这里的大变化是添加一个名为 ._lock 的属性,它是一个 threading.Lock() 实例对象。这个 ._lock 在未锁定状态下初始化,并由with语句锁定和释放。

这里值得注意的是,运行此方法的线程将一直保持 Lock ,直到完全完成对数据库的更新。在这种情况下,这意味着函数将在复制、更新、休眠时保持锁定,然后将值写回数据库。

如果在日志记录设置为警告级别的情况下运行此版本,你将看到以下内容:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 1: starting update
Thread 0: finishing update
Thread 1: finishing update
Testing locked update. Ending value is 2.

看看这个。你的程序终于成功了!

__main__ 中配置日志输出后,可以通过添加以下语句将级别设置为 DEBUG 来打开完整日志记录:

logging.getLogger().setLevel(logging.DEBUG)

在启用 DEBUG 后,运行此程序,如下所示:

$ ./fixrace.py
Testing locked update. Starting value is 0.
Thread 0: starting update
Thread 0 about to lock
Thread 0 has lock
Thread 1: starting update
Thread 1 about to lock
Thread 0 about to release lock
Thread 0 after release
Thread 0: finishing update
Thread 1 has lock
Thread 1 about to release lock
Thread 1 after release
Thread 1: finishing update
Testing locked update. Ending value is 2.

在输出中,你可以看到 Thread 0 得到了锁,并在进入睡眠状态时仍保持锁定。然后 Thread 1 启动并尝试获取相同的锁。因为 Thread 0 仍在持有锁, Thread 1 必须等待。这就是 Lock 的互斥性。

本文其余部分中的许多示例将日志设置为 WARNING DEBUG 级别。我们通常只是 DEBUG 级别的输出,因为 DEBUG 日志可能非常长。在日志记录打开的情况下尝试这些程序,看看它们能做什么。

死锁

在继续探索之前,应该先看看使用锁时的一个常见问题。如你所见,如果已经获取了 Lock ,则对 .acquire() 的二次调用将等到持有 Lock 的线程调用 .release() 。运行此代码时,你认为会发生什么情况?

import threading
l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")

当程序第二次调用 l.acquire() 时,该函数将挂起,等待 Lock 的释放。在本例中,可以通过删除第二次调用来修复死锁,但死锁通常发生在以下两个微妙的事情之一:

  1. 未正确释放 Lock 的错误。
  2. 设计问题,其中一个函数需要由某些函数调用,这些函数可能具有或可能不具有 Lock

第一种情况有时会发生,但使用 Lock 作为上下文管理器会大大减少错误出现的频率。建议尽可能使用上下文管理器编写代码,因为它们有助于避免异常跳过 .release() 调用的情况。

在某些语言中,设计问题可能要复杂一些。值得庆幸的是,Python线程的又一个对象 RLock 就是为这种情况而设计的。它允许线程在调用 .release() 之前多次通过 .acquire() 实现 RLock 。该线程中调用 .release() 的次数与调用 .acquire() 的次数相同。

Lock RLock 是线程中用来防止竞态条件的两个基本工具,还有一些其他工具以不同的方式发挥作用。在你查看它们之前,让我们转到一个稍微不同的问题上。

生产者-消费者线程

生产者-消费者问题(Producer-Consumer Problem,以下简称:PCP)是计算机科学中研究线程或进程同步的代表性问题,下面要通过它的一个变体来了解Python中threading模块提供的各种方法。

对于本例,你将想象一个程序需要从网络读取信息并将其写入磁盘。程序会确定是否要请求信息。它必须监听并接受信息,这些信息不会以正常的速度传入,而是会以突发的方式传入。程序的这一部分叫做生产者。

另一方面,一旦收到信息,你就需要将其写入数据库。数据库访问速度很慢,但这个速度足以跟上信息传输的平均速度。当一大堆信息进来时,访问速度还不够快。这部分是消费者。

在生产者和消费者之间,创建一个 Pipeline ,它将随着你对不同的同步对象的了解而变化。

这是基本的布局。让我们看看使用 Lock 的解决方案。它并不完美,但它使用的工具是你已经知道的,所以这是一个很好的开始。

使用锁的PCP

因为这是一篇关于Python的 threading 模块的文章,而且你刚刚阅读了 Lock 的使用方法,,所以让我们尝试用一两个使用 Lock 的线程来解决这个问题。

一般的设计是,有一个 producer 线程从模拟网络读取消息并将信息放入 Pipeline

import random
SENTINEL = object()
def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")
    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

要生成模拟信息, producer 中会生成一个介于1和101(不含101)之间的随机整数,然后调用 pipeline .set_message() ,将其发送到 consumer

producer 还使用 SENTINEL 值作为标记,当向 consumer 发送了10个值,就停止发送。这有点尴尬,但不要担心,在完成这个示例之后,你将看到消除这个 SENTINEL 值的方法。

pipeline 的另一边是消费者:

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

consumer pipeline 中读取一条信息并将其写入一个虚拟数据库,在本例中,只是将信息打印到显示器上。如果它得到 SENTINEL 值,就结束函数执行过程,该函数将终止线程。

在看真正有趣 Pipeline 部分之前,这里是 __main__ 的代码,它产生了以下线程:

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)
    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

这看起来应该相当熟悉,因为它接近前面示例中的 __main__ 代码。

请记住,你可以通过取消注释行打开 DEBUG 日志记录,以查看所有日志记录消息:

# logging.getLogger().setLevel(logging.DEBUG)

通过 DEBUG 日志信息来查看每个线程获取和释放锁的确切位置是值得的。

现在让我们看看将信息从 producer 传递给消费者的管道:

class Pipeline:
    Class to allow a single element pipeline between producer and consumer.
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()
    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message
    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)

哇!这么多代码。其中相当大的一部分只是日志语句,以便在运行代码时更容易看到发生了什么。下面是删除所有日志记录语句后的相同代码:

class Pipeline:
    Class to allow a single element pipeline between producer and consumer.
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()
    def get_message(self, name):
        self.consumer_lock.acquire()
        message = self.message
        self.producer_lock.release()
        return message
    def set_message(self, message, name):
        self.producer_lock.acquire()
        self.message = message
        self.consumer_lock.release()

这似乎更容易处理。此版本代码中的 Pipeline 有三个成员:

  • .message 存储要传递的信息。
  • .producer_lock threading.Lock 实例对象,在 producer 线程中,用它控制对信息的访问
  • .consumer_lock 也是 threading.Lock 实例对象,它在 consumer 线程控制对信息的访问。

__init__() 初始化这三个成员,然后调用 .consumer_lock 上的 .acquire() 。这是你想开始的状态。允许 producer 添加新信息,但 consumer 需要等待信息出现。

.get_message() .set_messages() 几乎相反。 .get_message() 调用 consumer_lock 上的 .acquire() ,它让 consumer 等待信息准备就绪。

一旦 consumer 获得了 .consumer_lock ,它就会复制出 .message 中的值,然后调用 .producer_lock 上的 .release() ,释放锁,允许 producer 将下一条信息插入到 pipeline 中。

在运行 .set_message() 之前,要注意 .get_message() 中的一个细节,通常以 return self.message 结束方法,但是此处不这样做,看看你能否弄清楚原因。

答案在此。一旦 consumer 调用 .producer_lock.release() ,它就会与 producer 交换位置, producer 开始运行,这种情况可能在 .release() 返回之前发生!这意味着,当函数 returns self.message 时,有比较小的概率会生成下一条信息,因此你将丢失第一条信息。这是另一个竞态的例子。

转到 .set_message() ,可以看到事务的另一面, producer 会用一条信息来调用它,获取 .producer_lock ,设置 .message ,然后调用 consumer_lock 上的 .release() 。这样就使得用户可以读取该值。

将日志设置为 WARNING 并执行代码,看看它是什么样子的:

$ ./prodcom_lock.py
Producer got data 43
Producer got data 45
Consumer storing data: 43
Producer got data 86
Consumer storing data: 45
Producer got data 40
Consumer storing data: 86
Producer got data 62
Consumer storing data: 40
Producer got data 15
Consumer storing data: 62
Producer got data 16
Consumer storing data: 15
Producer got data 61
Consumer storing data: 16
Producer got data 73
Consumer storing data: 61
Producer got data 22
Consumer storing data: 73
Consumer storing data: 22

一开始,你可能会发现奇怪的是, producer consumer 运行之前就收到两条信息。如果回顾一下 producer .set_message() ,你会注意到,当 producer 视图将信息发送到 pipeline 时,会等待 Lock 。这是在 producer 收到信息和日志之后完成的。

producer 尝试发送第二条信息时,它将第二次调用 .set_message() ,并且它将被锁定。

操作系统可以在任何时候交换线程,但它通常会让每个线程在交换之前有一个合理的运行时间。这就是为什么 producer 通常运行到它在第二次调用 .set_message() 时被锁定为止。

但是,一旦某个线程被锁定,操作系统就会将其交换出去,并找到另一个要运行的线程,此时的另一个线程就是 consumer

consumer 调用 .get_message() ,该函数读取信息并调用 .producer_lock 上的 .release() ,从而允许 producer 在下次交换线程时再次运行。

注意,第一条消息是43,这正是 consumer 读的内容,尽管 producer 已经生成了45这条信息。

以上是有限的测试,并没有很好地解决PCP,因为它一次只允许管道中的有一个值。当 producer 收到大量信息时,它将无处安放这些信息。

让我们使用 Queue 寻找一个更好的方法来解决这个问题。

原文链接:https://realpython.com/intro-to-python-threading/,译者:老齐

freeCodeCamp 是捐助者支持的 501(c)(3) 条款下具有免税资格的慈善组织(税号:82-0779546)。

我们的使命:帮助人们免费学习编程。我们通过创建成千上万的视频、文章和交互式编程课程——所有内容向公众免费开放——来实现这一目标。

所有给 freeCodeCamp 的捐款都将用于我们的教育项目,购买服务器和其他服务,以及聘用员工。

你可以 点击此处免税捐款 about:blank 是什么意思 打开 .dat 文件 Node 最新版本 反恶意软件服务 Windows10 产品密钥 Git 切换分支 AppData 文件夹 Windows 10 屏幕亮度 JSON 注释 MongoDB Atlas 教程 Python 字符串转数字 Git 命令 更新 NPM 依赖 谷歌恐龙游戏 CSS 使用 SVG 图片 Python 获取时间 Git Clone 指定分支 JS 字符串反转 React 个人作品网站 媒体查询范围 forEach 遍历数组 撤销 Git Add OSI 七层网络 Event Loop 执行顺序 CMD 删除文件 Git 删除分支 HTML 表格代码 Nano 怎么保存退出 HTML5 模板