添加链接
link管理
链接快照平台
  • 输入网页链接,自动生成快照
  • 标签化管理网页链接

rush 项目的时候,有些地方可以并行化,可以借助 C++ 的多线程来加速程序的执行。多线程的基本概念在一年前整过了,这里只是来看一下 C++ 的多线程该怎么写,顺便查漏补缺。

基本概念

在多线程进入 C++ 标准之前,人们使用 C++ 编写多线程的程序,只能依赖操作系统提供的 API。比如在 Linux 环境下就只能使用 pthread 库实现多线程,因此也一直被诟病。但有了 C++11 的 std::thread 以后,可以通过标准库在语言层面编写多线程程序了,直接的好处就是多线程程序的跨平台移植提供了便利。但是在编译的时候需要注意链接平台相关的线程库,如 g++ demo.cpp -lpthread -o test.o

简单实例

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <iostream>
#include <thread>

void show_info(std::string str) {
std::cout << str << std::endl;
}

int main() {
std::string s{"hello world"};
std::thread t{show_info, s};
t.join();
return 0;
}

以上述程序为例,来详细的剖析一下多线程期间到底发生了什么:

  • 首先引入头文件 thread ,在这个头文件中,C++ 11 提供了创建、管理线程的类和方法;
  • 使用 std::thread 创建线程,并通过列表初始化传入函数名作为构造函数的参数。传入的函数会作为子线程的入口函数,也就是说,当子线程准备就绪之后,就会开始执行这个入口函数。由于函数名表示函数的地址,子线程可以快捷的找到函数地址进而执行。

    我们知道,每个程序都有一个入口。当程序被装载到内存,处于系统态完成一些初始化的工作之后,控制权就转交给程序入口,并以此为标志进入用户态,这是一个程序的开始。同样地,线程也需要有「开始」的地方。作为线程入口的函数,就是线程函数,也就是例子中的 show_info。线程函数必须在启动线程之前,就准备好,否则线程去执行什么呢?并在线程初始化后立即执行。 1

  • 当线程函数返回时,线程也就随之终止了,上述程序中使用 join 衔接方法确保主线程在子线程退出之后才退出,因为主线程会阻塞住,直到该子线程退出为止。如果程序员没有显式的说明线程结束该如何处理,那么线程对象在被销毁时调用的析构函数中,会调用 std::terminate() 函数,销毁当前对象。如果程序写多了,应该不至于犯主线程退出子线程还没结束的低级错误。
  • detach

    前面说过线程的 join 会阻塞调用线程,可以使用 detach 来避免,但一定要做好控制:避免主线程退出子线程还没结束的低级错误。一个 cppreference 官网的例子:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    #include <iostream>
    #include <chrono>
    #include <thread>

    void independentThread()
    {
    std::cout << "Starting concurrent thread.\n";
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::cout << "Exiting concurrent thread.\n";
    }

    void threadCaller()
    {
    std::cout << "Starting thread caller.\n";
    std::thread t(independentThread);
    t.detach();
    std::this_thread::sleep_for(std::chrono::seconds(1));
    std::cout << "Exiting thread caller.\n";
    }

    int main()
    {
    threadCaller();
    std::this_thread::sleep_for(std::chrono::seconds(5));
    }

    // Starting thread caller.
    // Starting concurrent thread.
    // Exiting thread caller.
    // Exiting concurrent thread.

    可调用类型

    在创建线程对象时,传入的参数不仅是可被调用执行的函数,类的对象如果能被调用,也是可以作为线程对象的参数,用于构造函数初始化线程对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    #include <iostream>
    #include <thread>

    class Task {
    private:
    int cnt;
    public:
    explicit Task()=default;
    explicit Task(int a) : cnt{a} {};
    void operator()() {
    std::cout << this->cnt << std::endl;
    }
    };

    int main() {
    std::thread t{Task{1}};
    t.join();
    return 0;
    }

    因为要调用对象,所以重载了 () 运算符,不然线程不知道去哪个地址执行。此外,构造函数传入的是一个类类型的对象,所以对象会被拷贝到线程的存储空间,而后再开始执行。因此, 类必须做好足够的拷贝控制 ,不然将出现难以调试的 bug, 我大概只知道深浅拷贝,等有时间了去看下移动语义

    当然,不重载 () 运算符,选择类中的函数执行也是可以的,不过需要注意以下两点:

  • 必须显式地使用函数指针,作为 std::thread 构造函数的第一个参数;知道执行哪个函数。
  • 非静态成员函数的第一个参数,实际上是类实例的指针。所以在创建线程时,需要显式地填入这个参数;知道执行的函数在哪个对象。
  • 1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    #include <iostream>
    #include <thread>

    class A{
    private:
    int a;
    public:
    explicit A()=default;
    explicit A(int t) : a{t} {};
    void show_info() {
    std::cout << this->a << std::endl;
    }
    };

    int main() {
    A a{12};
    std::thread t{&A::show_info, &a};
    t.join();
    return 0;
    }

    其他要注意的数据类型

    引用

    如果子线程函数的参数是引用类型,也需要格外注意。由于子线程的数据是主线程的拷贝,因此子线程函数得到的拷贝实际是「线程存储空间中的拷贝的引用」,并不是主线程中的变量,应该使用 std::ref() 来生成正确的引用绑定,否则会报错。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #include <iostream>
    #include <thread>

    void show_info(std::string& s) {
    std::cout << s << std::endl;
    }

    int main() {
    std::string s{"hello world"};
    std::thread t{show_info, std::ref(s)};
    t.join();
    return 0;
    }

    右值引用和移动语义等我后期开坑了。

    锁与线程安全

    众所周知, 写代码的人都学过操作系统,学过操作系统都知道线程同步 。线程同步一般有三种机制:互斥量、信号量和条件变量,这三者到底什么已经在 这篇博客 中详细的描写过了,所以不再多说。不过当时是用 C 语言写的,现在来了解下 C++ 的写法。

    mutex

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <chrono>

    int counter = 0;
    std::mutex mtx;

    void increase(int time) {
    for (int i = 0; i < time; i++) {
    mtx.lock();
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    counter++;
    mtx.unlock();
    }
    }

    int main(int argc, char** argv) {
    std::thread t1(increase, 100);
    std::thread t2(increase, 100);
    t1.join();
    t2.join();
    std::cout << "counter:" << counter << std::endl;
    return 0;
    }
  • 引入 mutex 头文件,创建 std::mutex 对象 mtx
  • 对于 mtx 对象,任意时刻最多允许一个线程对其进行上锁,上锁后操作变量,就不会出错
  • mtx.try_lock() 是尝试上锁,如果上锁不成功,当前线程不阻塞
  • 在用完锁之后一定记得释放锁,否则会发生死锁现象
  • lock_guard

    为了避免 mutex 忘记解锁等情况,可以使用 std::lock_guard 这个类 只有构造函数和析构函数,搭配 mutex 使用,在创建这个对象时传入锁,调用锁的 lock 函数;变量销毁会调用析构函数,此时调用锁的 unlock 函数,这也就是传说中的 RAII 机制 2

    如下述程序 3 ,避免一个线程意外退出没来得及释放锁,导致另一个线程无法获取资源而死锁。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <chrono>
    #include <stdexcept>

    int counter = 0;
    std::mutex mtx;

    void increase_proxy(int time, int id) {
    for (int i = 0; i < time; i++) {
    std::lock_guard<std::mutex> lk(mtx);
    if (id == 1) {
    throw std::runtime_error("throw excption....");
    }
    // 当前线程休眠1毫秒
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    counter++;
    }
    }

    void increase(int time, int id) {
    try {
    increase_proxy(time, id);
    }
    catch (const std::exception& e){
    std::cout << "id:" << id << ", " << e.what() << std::endl;
    }
    }

    int main(int argc, char** argv) {
    std::thread t1(increase, 100, 1);
    std::thread t2(increase, 100, 2);
    t1.join();
    t2.join();
    std::cout << "counter:" << counter << std::endl;
    return 0;
    }

    lock_guard 与 adopt_lock

    还有一种为了防止死锁的方式是一次性申请所有临界资源的互斥量,只有申请到才能进行之后的操作,而 std::lock 提供了这种实现 4 。此外,为了防止没有锁定或提前释放互斥量导致危险,可以使用 lock_guard 并传入 std::adopt_lock ,前者保证当变量销毁时释放互斥量,后者保证线程已经上锁成功时不再调用 lock() 函数。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    #include <mutex>
    #include <thread>

    struct bank_account {
    explicit bank_account(int balance) : balance(balance) {}
    int balance;
    std::mutex m;
    };

    void transfer(bank_account &from, bank_account &to, int amount) {
    // avoid deadlock in case of self transfer
    if(&from == &to)
    return;
    // lock both mutexes without deadlock
    std::lock(from.m, to.m);
    // make sure both already-locked mutexes are unlocked at the end of scope
    std::lock_guard<std::mutex> lock1(from.m, std::adopt_lock);
    std::lock_guard<std::mutex> lock2(to.m, std::adopt_lock);

    from.balance -= amount;
    to.balance += amount;
    }

    int main() {
    bank_account my_account(100);
    bank_account your_account(50);
    std::thread t1(transfer, std::ref(my_account), std::ref(your_account), 10);
    std::thread t2(transfer, std::ref(your_account), std::ref(my_account), 5);
    t1.join();
    t2.join();
    return 0;
    }

    除了 adopt_lock 之外,还有 try_to_lock defer_lock ,他们都有不同的应用场景,还可以配合使用:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    void print_block (int n, char c) {
    //unique_lock有多组构造函数, 这里std::defer_lock不设置锁状态
    std::unique_lock<std::mutex> my_lock (mtx, std::defer_lock);
    //尝试加锁, 如果加锁成功则执行
    //(适合定时执行一个job的场景, 一个线程执行就可以, 可以用更新时间戳辅助)
    if(my_lock.try_lock()) {
    for (int i = 0; i < n; ++i)
    std::cout << c;
    std::cout << '\n';
    }
    }

    其他锁的内容实在是太多了,还有时间锁、递归锁、 lock_unique ,读写锁的 shared_lock 等等,等哪天用到在整理这些,这里只整理最简单的,详情可以参考 cppreference 5

    条件变量

    如果按照之前 C 语言的写法,条件变量需要注意的是 wait 那边的判断一定是 while 循环。 C 语言风格的代码

    当然,如果按照 C++ 的写法,我们发现条件变量的 wait 方法有 两个参数 ,第二个参数用于接受一个变量,如果继续等待,那么那个变量的取值是 false ,如果不需等待,那么那个变量返回 true

    1
    
    
    
    
        
    
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    #include <iostream>
    #include <mutex>
    #include <thread>
    #include <condition_variable>

    std::mutex g_mutex;
    std::condition_variable g_cond;
    int g_i = 0;
    bool g_running = false;

    void ThreadFunc(int n) {
    for (int i = 0; i < n; ++i) {
    {
    std::lock_guard<std::mutex> lock(g_mutex);
    ++g_i;
    std::cout << "plus g_i by func thread "
    << std::this_thread::get_id() << std::endl;
    }
    }

    // 等待被唤醒
    std::unique_lock<std::mutex> lock(g_mutex);
    std::cout << "wait for exit" << std::endl;

    g_cond.wait(lock, [=] {return g_running;});

    std::cout << "func thread exit" << std::endl;
    }

    int main() {
    int n = 100;
    std::thread t1(ThreadFunc, n);
    std::this_thread::sleep_for(std::chrono::seconds(1));
    for (int i = 0; i < n; ++i) {
    {
    std::lock_guard<std::mutex> lock(g_mutex);
    ++g_i;
    std::cout << "plus g_i by main thread "
    << std::this_thread::get_id() << std::endl;
    }
    }

    // 唤醒
    {
    std::lock_guard<std::mutex> lock(g_mutex);
    g_running = true;
    g_cond.notify_one();
    }

    t1.join();
    std::cout << "g_i = " << g_i << std::endl;
    }

    输出如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    plus g_i by func thread 140476623930944
    plus g_i by func thread 140476623930944
    wait for exit // 表示子线程等待唤醒
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    plus g_i by main thread 140476623935296
    func thread exit // 子线程被唤醒
    g_i = 200

    信号量

    因为一开始我也不知道该怎么去写信号量,所以打开了万能的搜索引擎,看到了关于 C++ 不支持信号量这样的东西 6 。如果想实现信号量,可以通过互斥量和条件变量来实现。而关于信号量和互斥量的区别,在 这篇文章 中已经写明了。那么来实现一个信号量的类 7

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    #include <iostream>
    #include <mutex>
    #include <thread>
    #include <vector>
    #include <condition_variable>

    class Semaphore {
    private:
    std::mutex mutex_;
    std::condition_variable cv_;
    int count_;

    public:
    explicit Semaphore(int count = 0) : count_(count) {
    }

    void Signal() {
    std::unique_lock<std::mutex> lock(mutex_);
    ++count_;
    cv_.notify_one();
    }

    void Wait() {
    std::unique_lock<std::mutex> lock(mutex_);
    // 第二个参数,如果返回 false 继续等待, 如果为 true,可以继续申请资源
    cv_.wait(lock, [=] { return count_ > 0; });
    --count_;
    }
    };

    std::string FormatTimeNow(const char* format) {
    auto now = std::chrono::system_clock::now();
    std::time_t now_c = std::chrono::system_clock::to_time_t(now);
    std::tm* now_tm = std::localtime(&now_c);

    char buf[20];
    std::strftime(buf, sizeof(buf), format, now_tm);
    return std::string(buf);
    }

    Semaphore g_semaphore(3);
    // 防止同时抢占输出资源
    std::mutex g_io_mutex;

    void Worker() {
    g_semaphore.Wait();

    std::thread::id thread_id = std::this_thread::get_id();
    std::string now = FormatTimeNow("%H:%M:%S");
    {
    std::lock_guard<std::mutex> lock(g_io_mutex);
    std::cout << "Thread " << thread_id << ": wait succeeded"
    << " (" << now << ")" << std::endl;
    }
    // Sleep 1 second to simulate data processing.
    std::this_thread::sleep_for(std::chrono::seconds(1));

    g_semaphore.Signal();
    }

    int main() {
    std::vector<std::thread> v;
    for (std::size_t i = 0; i < 3; ++i) {
    v.emplace_back(&Worker);
    }
    for (std::thread& t : v) {
    t.join();
    }
    return 0;
    }
  • 信号量的值为 3,表示能同时申请 3 个资源
  • 当一个线程申请资源后,即执行了 wait 操作, count_ 取值递减,表示有一个资源被占用
  • count_ 取值小于 0 时,调用条件变量的 wait 方法,当先线程等待有了资源被唤醒
  • 当一个线程释放资源后,执行了 signal 操作, count_ 取值递增,表示有一个资源被释放,并执行 notify_one 方法,即唤醒一个等待的线程
  • 参考

  •