前言
最近在读陈硕的moduo网络库的书,记录总结一些东西。在并发编程中,有两种基本模型,一种是message passing,另外一种是Shared memory。这一章主要讲述shared memory模型下线程同步需要注意的一些问题。作者陈硕推荐尽可能使用message passing,因为其更容易保证程序的正确性,并且可以移植到分布式系统中,扩容性更强。但是多线程编程仍然需要程序员了解shared memory情形,以备不时之需。
本章题名为“线程同步精要”,又在文章开头提到并发编程的两种基本模型,私以为,线程同步精要指的单单是shared memory,只有线程间共享的资源,才需要采取同步措施来保证资源的正确访问。本章作者分享的是他个人在线程同步方面的经验与心得。
线程同步的四项原则
文中按照重要性先后列出线程同步的四项原则
- 首要原则是尽量最低限度地共享对象,减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露;如果要暴露,优先考虑immutable对象;实在不行才暴露可修改的对象,并用同步措施来充分的保护它。
- 其次是使用高级的并发编程构件,如TaskQueue、Producer-Consumer Queue、CountDownLatch等等。
- 最后不得已必须使用底层同步原语(primitives)时,只用非递归的互斥器和条件变量,慎用读写锁,不要用信号量。
- 除了使用atomic整数之外,不自己编写lock-free代码,也不要用“内核级”同步原语。不凭空猜测“哪种做法性能更好”,比如“spin lock vs.
原则第三条
互斥器
互斥器,很简单,就是RAII收发,常见的MutexLock和MutexLockGuard手法,MutexLock负责锁的创建和销毁以及判断是否被锁,MutexLockGuard负责加锁解锁(即临界区的进入和退出)。
作者推荐使用非递归锁,因为同一个线程中多次对非递归锁,会立刻导致死锁,这可以帮助我们思考代码对锁的期求,并且及早(在编码阶段)发现问题。作者举出的典型情况如下(https://github.com/chenshuo/recipes/blob/master/thread/test/NonRecursiveMutex_test.cc):
MutexLock mutex; std::vector<Foo> foos; void post(const Foo& f) { MutexLockGuard lock(mutex); foos.push_back(f); } void traverse() { MutexLockGuard lock(mutex); for (std::vector<Foo>::const_iterator it = foos.begin(); it != foos.end(); ++it) { it->doit(); } } void Foo::doit() const { Foo f; post(f); }
显然traverse递归foos,并逐个调用doit,doit里面调用post,进而造成单个锁的死锁。如果是递归的,则push_back可能导致外围的vector迭代器失效,进而导致程序crash,这种偶尔性的情况会让程序员抓耳挠腮去想到底程序为啥会崩。如果是非递归锁,则直接死锁,并且gdb调试 `thread apply all bt`查看各个线程的调用栈,就很容易查找到在哪里死锁,进而修复问题。
如果确实需要在遍历的时候修改vector,做法有二:
- 修改推后,记住需要增删的元素,遍历结束再统一修改
- copy-on-write,对副本进行操作,再加锁赋值回去(见书中2.8节所举实例,非常有代表性)
死锁案例分析
文中描述了单个线程死锁和两个线程间死锁的例子和调试过程。如下:
单个线程死锁(https://github.com/chenshuo/recipes/blob/master/thread/test/SelfDeadLock.cc)
#include "../Mutex.h" class Request { public: void process() // __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); print(); } void print() const // __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); } private: mutable muduo::MutexLock mutex_; }; int main() { Request req; req.process(); }
两个线程死锁(https://github.com/chenshuo/recipes/blob/master/thread/test/MutualDeadLock.cc)
#include "../Mutex.h" #include "../Thread.h" #include <set> #include <stdio.h> class Request; class Inventory { public: void add(Request* req) { muduo::MutexLockGuard lock(mutex_); requests_.insert(req); } void remove(Request* req) __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); requests_.erase(req); } void printAll() const; private: mutable muduo::MutexLock mutex_; std::set<Request*> requests_; }; Inventory g_inventory; class Request { public: void process() // __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); g_inventory.add(this); // ... } ~Request() __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); sleep(1); g_inventory.remove(this); } void print() const __attribute__ ((noinline)) { muduo::MutexLockGuard lock(mutex_); // ... } private: mutable muduo::MutexLock mutex_; }; void Inventory::printAll() const { muduo::MutexLockGuard lock(mutex_); sleep(1); for (std::set<Request*>::const_iterator it = requests_.begin(); it != requests_.end(); ++it) { (*it)->print(); } printf("Inventory::printAll() unlocked\n"); } /* void Inventory::printAll() const { std::set<Request*> requests { muduo::MutexLockGuard lock(mutex_); requests = requests_; } for (std::set<Request*>::const_iterator it = requests.begin(); it != requests.end(); ++it) { (*it)->print(); } } */ void threadFunc() { Request* req = new Request; req->process(); delete req; } int main() { muduo::Thread thread(threadFunc); thread.start(); usleep(500 * 1000); g_inventory.printAll(); thread.join(); }
条件变量
书中给出条件变量正确使用的方式
- 必须与mutex一起使用,该布尔表达式的读写受此mutex保护
- 在mutex已上锁的时候才能调用wait()
- 把判断布尔条件和wait()放到while循环中
其中,第三点是为了防止虚假唤醒(spurious wakeup)
另外,我觉得需要注意的一点是,wait会有一个加解锁的过程,即原子性地unlock mutex进入等待,wait执行完毕后自动重新加锁。
条件变量和互斥器组合举例(高级并发构件)
BlockQueue
书中使用条件变量自制muduo的BlockQueue,见代码https://github.com/chenshuo/muduo/blob/master/muduo/base/BlockingQueue.h,关键代码如下:
void put(const T& x) { MutexLockGuard lock(mutex_); queue_.push_back(x); notEmpty_.notify(); // wait morphing saves us // http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/ } T take() { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty()) { notEmpty_.wait(); } assert(!queue_.empty()); T front(std::move(queue_.front())); queue_.pop_front(); return std::move(front); }
CountDownLatch
CountDownLatch主要有两种用途:
- 主线程发起多个子线程,等这些子线程各自都完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化
- 主线程发起多个子线程,子线程都等待主线程,主线程完成一些其他任务之后,通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令
互斥器和条件变量构成了多线程编程的全部必备同步原语,用他们即可完成低级的同步过程,或者编写高级的同步组件,作者认为我们应该熟练掌握这个同步原语的用法,再在必要的时候去考虑其他手段提高性能。
禁止使用读写锁和信号量
文中叙述了种种不待见读写锁和信号量的原因。反正我是两个都不怎么用。。。读写锁没了解过,信号量之前有考虑用过,后来使用条件变量的方案替代掉了。
读写锁的问题有:
- 正确性:程序员容易不小心在read lock保护的函数中调用修改状态的函数
- 性能:读写锁开销总是大于等于mutex
- 读锁如果可以提升为写锁,那么可能会造成可重入锁一样的问题
信号量的问题有:
- semaphore has no notion of ownership
- 时刻保持信号量与我们自己数据结构长度值的一致,增加了程序猿的负担和出错的可能
总结来说就是不要用,而条件变量和互斥器完全可以替代这些东西
线程安全的Singleton实现
书中给出lazy-initialization的线程安全的做法
https://github.com/chenshuo/muduo/blob/master/muduo/base/Singleton.h
借shared_ptr实现copy-on-write
看完这小节的例子,收获很大,对前面知识的理解也深了一些。总结来说,这节核心知识点有三:
- 如何加锁解锁来避免死锁的问题
- 不管读或者写,都应该对shared_ptr加锁
- 写时,可以先加锁拷贝变量副本,然后修改变量副本。读时,可以先临界区拷贝变量到函数局部变量,再操作函数局部变量来避免死锁。copy-on-write或者read-and-update是很有用的技术,前者帮助避免修改正在读的数据,后者帮助避免死锁或者读正在修改的数据。
后记
扩展阅读
《Read-World Concurrrency》https://queue.acm.org/detail.cfm?id=1454462
虚假唤醒 https://en.wikipedia.org/wiki/Spurious_wakeup
- 主线程发起多个子线程,等这些子线程各自都完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化
没有帐号?立即注册