rush 项目的时候,有些地方可以并行化,可以借助 C++ 的多线程来加速程序的执行。多线程的基本概念在一年前整过了,这里只是来看一下 C++ 的多线程该怎么写,顺便查漏补缺。
1 2 3 4 5 6 7 8 9 10 11 12 13
#include <iostream> #include <thread> void show_info (std::string str) { std::cout << str << std::endl; } int main () { std::string s{"hello world" }; std::thread t{show_info, s}; t.join (); return 0 ; }
1
当线程函数返回时,线程也就随之终止了,上述程序中使用
join
衔接方法确保主线程在子线程退出之后才退出,因为主线程会阻塞住,直到该子线程退出为止。如果程序员没有显式的说明线程结束该如何处理,那么线程对象在被销毁时调用的析构函数中,会调用
std::terminate()
函数,销毁当前对象。如果程序写多了,应该不至于犯主线程退出子线程还没结束的低级错误。
这篇博客
中详细的描写过了,所以不再多说。不过当时是用 C 语言写的,现在来了解下 C++ 的写法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
19 20 21 22 23 24 25
#include <iostream> #include <thread> #include <mutex> #include <chrono> int counter = 0 ;std::mutex mtx; void increase (int time) { for (int i = 0 ; i < time; i++) { mtx.lock (); std::this_thread::sleep_for (std::chrono::milliseconds (1 )); counter++; mtx.unlock (); } } int main (int argc, char ** argv) { std::thread t1 (increase, 100 ) ; std::thread t2 (increase, 100 ) ; t1.join (); t2.join (); std::cout << "counter:" << counter << std::endl; return 0 ; }
这个类
只有构造函数和析构函数,搭配
mutex
使用,在创建这个对象时传入锁,调用锁的
lock
函数;变量销毁会调用析构函数,此时调用锁的
unlock
函数,这也就是传说中的 RAII 机制
2
。
如下述程序
3
,避免一个线程意外退出没来得及释放锁,导致另一个线程无法获取资源而死锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
#include <iostream> #include <thread> #include <mutex> #include <chrono> #include <stdexcept> int counter = 0 ;std::mutex mtx; void increase_proxy (int time, int id) { for (int i = 0 ; i < time; i++) { std::lock_guard<std::mutex> lk (mtx) ; if (id == 1 ) { throw std::runtime_error ("throw excption...." ); } std::this_thread::sleep_for (std::chrono::milliseconds (1 )); counter++; } } void increase (int time, int id) { try { increase_proxy (time, id); } catch (const std::exception& e){ std::cout << "id:" << id << ", " << e.what () << std::endl; } } int main (int argc, char ** argv) { std::thread t1 (increase, 100 , 1 ) ; std::thread t2 (increase, 100 , 2 ) ; t1.join (); t2.join (); std::cout << "counter:" << counter << std::endl; return 0 ; }
4
。此外,为了防止没有锁定或提前释放互斥量导致危险,可以使用
lock_guard
并传入
std::adopt_lock
,前者保证当变量销毁时释放互斥量,后者保证线程已经上锁成功时不再调用
lock()
函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
#include <mutex> #include <thread> struct bank_account { explicit bank_account (int balance) : balance(balance) { } int balance; std::mutex m; }; void transfer (bank_account &from, bank_account &to, int amount) { if (&from == &to) return ; std::lock (from.m, to.m); std::lock_guard<std::mutex> lock1 (from.m, std::adopt_lock) ; std::lock_guard<std::mutex> lock2 (to.m, std::adopt_lock) ; from.balance -= amount; to.balance += amount; } int main () { bank_account my_account (100 ) ; bank_account your_account (50 ) ; std::thread t1 (transfer, std::ref(my_account), std::ref(your_account), 10 ) ; std::thread t2 (transfer, std::ref(your_account), std::ref(my_account), 5 ) ; t1.join (); t2.join (); return 0 ; }
除了
adopt_lock
之外,还有
try_to_lock
,
defer_lock
,他们都有不同的应用场景,还可以配合使用:
1 2 3 4 5 6 7 8 9 10 11
void print_block (int n, char c) { std::unique_lock<std::mutex> my_lock (mtx, std::defer_lock) ; if (my_lock.try_lock ()) { for (int i = 0 ; i < n; ++i) std::cout << c; std::cout << '\n' ; } }
其他锁的内容实在是太多了,还有时间锁、递归锁、
lock_unique
,读写锁的
shared_lock
等等,等哪天用到在整理这些,这里只整理最简单的,详情可以参考 cppreference
5
。
C
语言风格的代码
。
当然,如果按照
C++
的写法,我们发现条件变量的
wait
方法有
两个参数
,第二个参数用于接受一个变量,如果继续等待,那么那个变量的取值是
false
,如果不需等待,那么那个变量返回
true
。
1
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
#include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::mutex g_mutex; std::condition_variable g_cond; int g_i = 0 ;bool g_running = false ;void ThreadFunc (int n) { for (int i = 0 ; i < n; ++i) { { std::lock_guard<std::mutex> lock (g_mutex) ; ++g_i; std::cout << "plus g_i by func thread " << std::this_thread::get_id () << std::endl; } } std::unique_lock<std::mutex> lock (g_mutex) ; std::cout << "wait for exit" << std::endl; g_cond.wait (lock, [=] {return g_running;}); std::cout << "func thread exit" << std::endl; } int main () { int n = 100 ; std::thread t1 (ThreadFunc, n) ; std::this_thread::sleep_for (std::chrono::seconds (1 )); for (int i = 0 ; i < n; ++i) { { std::lock_guard<std::mutex> lock (g_mutex) ; ++g_i; std::cout << "plus g_i by main thread " << std::this_thread::get_id () << std::endl; } } { std::lock_guard<std::mutex> lock (g_mutex) ; g_running = true ; g_cond.notify_one (); } t1.join (); std::cout << "g_i = " << g_i << std::endl; }
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
plus g_i by func thread 140476623930944 plus g_i by func thread 140476623930944 wait for exit // 表示子线程等待唤醒 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 plus g_i by main thread 140476623935296 func thread exit // 子线程被唤醒 g_i = 200
6
。如果想实现信号量,可以通过互斥量和条件变量来实现。而关于信号量和互斥量的区别,在
这篇文章
中已经写明了。那么来实现一个信号量的类
7
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
#include <iostream> #include <mutex> #include <thread> #include <vector> #include <condition_variable> class Semaphore {private : std::mutex mutex_; std::condition_variable cv_; int count_; public : explicit Semaphore (int count = 0 ) : count_(count) { } void Signal () { std::unique_lock<std::mutex> lock (mutex_) ; ++count_; cv_.notify_one (); } void Wait () { std::unique_lock<std::mutex> lock (mutex_) ; cv_.wait (lock, [=] { return count_ > 0 ; }); --count_; } }; std::string FormatTimeNow (const char * format) { auto now = std::chrono::system_clock::now (); std::time_t now_c = std::chrono::system_clock::to_time_t (now); std::tm* now_tm = std::localtime (&now_c); char buf[20 ]; std::strftime (buf, sizeof (buf), format, now_tm); return std::string (buf); } Semaphore g_semaphore (3 ) ;std::mutex g_io_mutex; void Worker () { g_semaphore.Wait (); std::thread::id thread_id = std::this_thread::get_id (); std::string now = FormatTimeNow ("%H:%M:%S" ); { std::lock_guard<std::mutex> lock (g_io_mutex) ; std::cout << "Thread " << thread_id << ": wait succeeded" << " (" << now << ")" << std::endl; } std::this_thread::sleep_for (std::chrono::seconds (1 )); g_semaphore.Signal (); } int main () { std::vector<std::thread> v; for (std::size_t i = 0 ; i < 3 ; ++i) { v.emplace_back (&Worker); } for (std::thread& t : v) { t.join (); } return 0 ; }
信号量的值为 3,表示能同时申请 3 个资源
当一个线程申请资源后,即执行了
wait
操作,
count_
取值递减,表示有一个资源被占用
当
count_
取值小于 0 时,调用条件变量的
wait
方法,当先线程等待有了资源被唤醒
当一个线程释放资源后,执行了
signal
操作,
count_
取值递增,表示有一个资源被释放,并执行
notify_one
方法,即唤醒一个等待的线程