linux多线程服务端编程第二章-线程同步精要 读后感
? c++ ? ? 多线程 ?   发布于 2018-11-10   987人围观  0条评论
? c++ ? ? 多线程 ?   发表于 2018-11-10   987人围观  0条评论

前言

最近在读陈硕的moduo网络库的书,记录总结一些东西。在并发编程中,有两种基本模型,一种是message passing,另外一种是Shared memory。这一章主要讲述shared memory模型下线程同步需要注意的一些问题。作者陈硕推荐尽可能使用message passing,因为其更容易保证程序的正确性,并且可以移植到分布式系统中,扩容性更强。但是多线程编程仍然需要程序员了解shared memory情形,以备不时之需。

本章题名为“线程同步精要”,又在文章开头提到并发编程的两种基本模型,私以为,线程同步精要指的单单是shared memory,只有线程间共享的资源,才需要采取同步措施来保证资源的正确访问。本章作者分享的是他个人在线程同步方面的经验与心得。

 

线程同步的四项原则

文中按照重要性先后列出线程同步的四项原则

  1. 首要原则是尽量最低限度地共享对象,减少需要同步的场合。一个对象能不暴露给别的线程就不要暴露;如果要暴露,优先考虑immutable对象;实在不行才暴露可修改的对象,并用同步措施来充分的保护它。
  2. 其次是使用高级的并发编程构件,如TaskQueue、Producer-Consumer Queue、CountDownLatch等等。
  3. 最后不得已必须使用底层同步原语(primitives)时,只用非递归的互斥器和条件变量,慎用读写锁,不要用信号量。
  4. 除了使用atomic整数之外,不自己编写lock-free代码,也不要用“内核级”同步原语。不凭空猜测“哪种做法性能更好”,比如“spin lock vs. 
原则第一条就可以看出作者极力不推荐共享内存,笔者个人的经验是,共享资源给程序的编写和维护带来许多不便,与作者同感。
原则第二条是在解释,如果线程间需要共享某些资源,则最好采用高级组件避免繁复的与底层原语打交道的编程过程。个人在这里的经验是,采用高级组件可以提高开发效率。
原则第三条是若不得不使用底层原语,则最好只使用非递归的互斥器和条件变量,其他的不要用,文中针对第三点作了大篇幅的解释。

原则第三条

互斥器

互斥器,很简单,就是RAII收发,常见的MutexLock和MutexLockGuard手法,MutexLock负责锁的创建和销毁以及判断是否被锁,MutexLockGuard负责加锁解锁(即临界区的进入和退出)。

作者推荐使用非递归锁,因为同一个线程中多次对非递归锁,会立刻导致死锁,这可以帮助我们思考代码对锁的期求,并且及早(在编码阶段)发现问题。作者举出的典型情况如下(https://github.com/chenshuo/recipes/blob/master/thread/test/NonRecursiveMutex_test.cc):

MutexLock mutex;
std::vector<Foo> foos;

void post(const Foo& f)
{
  MutexLockGuard lock(mutex);
  foos.push_back(f);
}

void traverse()
{
  MutexLockGuard lock(mutex);
  for (std::vector<Foo>::const_iterator it = foos.begin();
      it != foos.end(); ++it)
  {
    it->doit();
  }
}

void Foo::doit() const
{
  Foo f;
  post(f);
}

显然traverse递归foos,并逐个调用doit,doit里面调用post,进而造成单个锁的死锁。如果是递归的,则push_back可能导致外围的vector迭代器失效,进而导致程序crash,这种偶尔性的情况会让程序员抓耳挠腮去想到底程序为啥会崩。如果是非递归锁,则直接死锁,并且gdb调试 `thread apply all bt`查看各个线程的调用栈,就很容易查找到在哪里死锁,进而修复问题。

如果确实需要在遍历的时候修改vector,做法有二:

  1. 修改推后,记住需要增删的元素,遍历结束再统一修改
  2. copy-on-write,对副本进行操作,再加锁赋值回去(见书中2.8节所举实例,非常有代表性)

死锁案例分析

文中描述了单个线程死锁和两个线程间死锁的例子和调试过程。如下:

单个线程死锁(https://github.com/chenshuo/recipes/blob/master/thread/test/SelfDeadLock.cc)

#include "../Mutex.h"

class Request
{
 public:
  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    print();
  }

  void print() const // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
  }

 private:
  mutable muduo::MutexLock mutex_;
};

int main()
{
  Request req;
  req.process();
}

两个线程死锁(https://github.com/chenshuo/recipes/blob/master/thread/test/MutualDeadLock.cc)

#include "../Mutex.h"
#include "../Thread.h"
#include <set>
#include <stdio.h>

class Request;

class Inventory
{
 public:
  void add(Request* req)
  {
    muduo::MutexLockGuard lock(mutex_);
    requests_.insert(req);
  }

  void remove(Request* req) __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    requests_.erase(req);
  }

  void printAll() const;

 private:
  mutable muduo::MutexLock mutex_;
  std::set<Request*> requests_;
};

Inventory g_inventory;

class Request
{
 public:
  void process() // __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    g_inventory.add(this);
    // ...
  }

  ~Request() __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    sleep(1);
    g_inventory.remove(this);
  }

  void print() const __attribute__ ((noinline))
  {
    muduo::MutexLockGuard lock(mutex_);
    // ...
  }

 private:
  mutable muduo::MutexLock mutex_;
};

void Inventory::printAll() const
{
  muduo::MutexLockGuard lock(mutex_);
  sleep(1);
  for (std::set<Request*>::const_iterator it = requests_.begin();
      it != requests_.end();
      ++it)
  {
    (*it)->print();
  }
  printf("Inventory::printAll() unlocked\n");
}

/*
void Inventory::printAll() const
{
  std::set<Request*> requests
  {
    muduo::MutexLockGuard lock(mutex_);
    requests = requests_;
  }
  for (std::set<Request*>::const_iterator it = requests.begin();
      it != requests.end();
      ++it)
  {
    (*it)->print();
  }
}
*/

void threadFunc()
{
  Request* req = new Request;
  req->process();
  delete req;
}

int main()
{
  muduo::Thread thread(threadFunc);
  thread.start();
  usleep(500 * 1000);
  g_inventory.printAll();
  thread.join();
}

条件变量

书中给出条件变量正确使用的方式

  1. 必须与mutex一起使用,该布尔表达式的读写受此mutex保护
  2. 在mutex已上锁的时候才能调用wait()
  3. 把判断布尔条件和wait()放到while循环中

其中,第三点是为了防止虚假唤醒(spurious wakeup)

另外,我觉得需要注意的一点是,wait会有一个加解锁的过程,即原子性地unlock mutex进入等待,wait执行完毕后自动重新加锁。

条件变量和互斥器组合举例(高级并发构件)

BlockQueue

书中使用条件变量自制muduo的BlockQueue,见代码https://github.com/chenshuo/muduo/blob/master/muduo/base/BlockingQueue.h,关键代码如下:

 

void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    queue_.push_back(x);
    notEmpty_.notify(); // wait morphing saves us
    // http://www.domaigne.com/blog/computing/condvars-signal-with-mutex-locked-or-not/
  }

  T take()
  {
    MutexLockGuard lock(mutex_);
    // always use a while-loop, due to spurious wakeup
    while (queue_.empty())
    {
      notEmpty_.wait();
    }
    assert(!queue_.empty());
    T front(std::move(queue_.front()));
    queue_.pop_front();
    return std::move(front);
  }

 

CountDownLatch

CountDownLatch主要有两种用途:

  1. 主线程发起多个子线程,等这些子线程各自都完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化
  2. 主线程发起多个子线程,子线程都等待主线程,主线程完成一些其他任务之后,通知所有子线程开始执行。通常用于多个子线程等待主线程发出“起跑”命令

互斥器和条件变量构成了多线程编程的全部必备同步原语,用他们即可完成低级的同步过程,或者编写高级的同步组件,作者认为我们应该熟练掌握这个同步原语的用法,再在必要的时候去考虑其他手段提高性能。

禁止使用读写锁和信号量

文中叙述了种种不待见读写锁和信号量的原因。反正我是两个都不怎么用。。。读写锁没了解过,信号量之前有考虑用过,后来使用条件变量的方案替代掉了。

读写锁的问题有:

  1. 正确性:程序员容易不小心在read lock保护的函数中调用修改状态的函数
  2. 性能:读写锁开销总是大于等于mutex
  3. 读锁如果可以提升为写锁,那么可能会造成可重入锁一样的问题

信号量的问题有:

  1. semaphore has no notion of ownership
  2. 时刻保持信号量与我们自己数据结构长度值的一致,增加了程序猿的负担和出错的可能

总结来说就是不要用,而条件变量和互斥器完全可以替代这些东西

线程安全的Singleton实现

书中给出lazy-initialization的线程安全的做法

https://github.com/chenshuo/muduo/blob/master/muduo/base/Singleton.h

借shared_ptr实现copy-on-write

看完这小节的例子,收获很大,对前面知识的理解也深了一些。总结来说,这节核心知识点有三:

  1. 如何加锁解锁来避免死锁的问题
  2. 不管读或者写,都应该对shared_ptr加锁
  3. 写时,可以先加锁拷贝变量副本,然后修改变量副本。读时,可以先临界区拷贝变量到函数局部变量,再操作函数局部变量来避免死锁。copy-on-write或者read-and-update是很有用的技术,前者帮助避免修改正在读的数据,后者帮助避免死锁或者读正在修改的数据。

后记

扩展阅读

《Read-World Concurrrency》https://queue.acm.org/detail.cfm?id=1454462

 虚假唤醒 https://en.wikipedia.org/wiki/Spurious_wakeup

  1. 主线程发起多个子线程,等这些子线程各自都完成一定任务后,主线程才继续执行。通常用于主线程等待多个子线程完成初始化

上一篇: linux多线程服务端编程第三章-多线程服务器的适用场合与常用编程模型 读后感

下一篇: linux 打开 core dump

立即登录,发表评论
没有帐号?立即注册
0 条评论