使用固定的获取锁的顺序可以很大程度上避免死锁.,不过这有时候并不好控制.

首先,什么是死锁:

死锁是指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通信而造成的一种阻塞的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。

一个对线程中的每一个都需要同时锁定两个互斥元来执行一些操作,并且每个线程都拥有了一个互斥元,同时等待另外一个.线程都无法继续,因为每个线程都在等待另一个释放其互斥元.这种情景称为死锁(deadlock).它是在需要锁定两个或更多互斥元以执行操作时最大问题

那么死锁什么情况下才会出现呢?

虽然进程在运行过程中,可能发生死锁,但死锁的发生也必须具备一定的条件,死锁的发生必须具备以下四个必要条件:

  1. 互斥条件:指进程对所分配到的资源进行排它性使用,即在一段时间内某资源只由一个进程占用。如果此时还有其它进程请求资源,则请求者只能等待,直至占有资源的进程用毕释放。
  2. 请求和保持条件:指进程已经保持至少一个资源,但又提出了新的资源请求,而该资源已被其它进程占有,此时请求进程阻塞,但又对自己已获得的其它资源保持不放。
  3. 不剥夺条件:指进程已获得的资源,在未使用完之前,不能被剥夺,只能在使用完时由自己释放。
  4. 环路等待条件:指在发生死锁时,必然存在一个进程——资源的环形链,即进程集合{P0,P1,P2,···,Pn}中的P0正在等待一个P1占用的资源;P1正在等待P2占用的资源,……,Pn正在等待已被P0占用的资源。

现在,我们来看一个最简单的关于死锁的例子:

int main()
{
    thread t1;
    thread t2;
    t1 = thread([&t2] {
        cout << "thread-1 start!" << endl;
        std::chrono::seconds dura(2);
        std::this_thread::sleep_for(dura);
        t2.join();
        cout << "thread-1 finished!" << endl;
    });
    t2 = thread([&t1] {
        cout << "thread-2 start!" << endl;
        std::chrono::seconds dura(2);
        std::this_thread::sleep_for(dura);
        t1.join();
        cout << "thread-2 finished!" << endl;
    });
    system("pause");
    return 0; 
}

 

在上面的例子中,我们启动两个线程t1和t2并让t1线程等待t2执行完成后才执行,同时t2线程又要等待t1线程执行完成才能执行,造成了死锁.这说明死锁并不仅仅产生于锁定,虽然这是最常见的诱因.

更进一步的,我们来看一个最常见的死锁:

class X 
{
private:
    int data;
    mutex m;
public:
    X(int _data) :data(_data) { }
    void lock() { m.lock(); }
    void unlock() { m.unlock(); }
    void showData()
    {
        cout << data << endl;
    }
    friend void swap(X& lhs, X& rhs) 
    {
        if (&lhs == &rhs)
        {
            return;
        }
        /*
        lock_guard<mutex> lock_a(lhs.m);

        cout << "locked" << lhs.data << endl;
        std::chrono::seconds dura(1);
        std::this_thread::sleep_for(dura);

        lock_guard<mutex> lock_b(rhs.m);
        cout << "locked" << rhs.data << endl;
        */

        // std::lock 可以同时锁定两个或更多的互斥元,而没有死锁的风险
        // 任何一个锁定失败都释放已经锁定的互斥量
        std::lock(lhs.m, rhs.m);
        // 提供一个额外参数std::adopt_lock 告知该互斥元已经被锁定
        lock_guard<mutex> lock_a(lhs.m, adopt_lock);
        lock_guard<mutex> lock_b(rhs.m, adopt_lock);
        swap(lhs.data, rhs.data);

    }
};

 

上述例子中,注释部分就是产生死锁的关键因素:X类有一个友元的交换函数,此时有两个X类型的变量x1,x2,有两个不同的线程t1,t2,在t1中调用了swap(x1,x2),而在t2中调用了swap(x2,x1),此时就会出现互相等待的情形,造成死锁.

解决方案则是使用std::lock来同时锁定两个或多个的锁.

接下来我们来总结一下:

我们发现:避免死锁的准则全都可以归结为一个思路,如果有另外一个线程有可能在等待你那你就别等它.这个独特的准则为识别和消除别的线程等待你的可能性提供了方法.下面给出一些常见的方法来避免死锁

  1. 避免嵌套锁:第一个思路最为简单,如果你已经持有了一个锁,就别再获取锁.如果你坚持这个准则,光凭使用锁是不可能导致死锁的,因为每个线程仅仅持有一个锁.你仍然会从其他事情(比如线程的相互等待)中得到死锁,但是互斥元锁定可能是死锁最常见的诱因.如果需要获取多个锁,为了避免死锁,就以std::lock的当个动作来执行.
  2. 在持有锁时,避免调用用户提供的代码:这是前面一条准则的简单后续,因为代码是用户提供的,你不知道它会做什么,它可能做包括锁在内的任何事情.如果你在持有锁时调用用户提供的代码,并且这段代码获取一个锁,你就违反的避免嵌套锁的准则,可能导致死锁.
  3. 以固定顺序获取锁:如果你绝对需要两个或更多的锁,并且不能以std::lock的单个操作取得,次优的做法是在每个线程中以相同的顺序获取它们.
  4. 使用层次锁:这其实是定义锁顺序的一个特例,也是我们将要讨论的.

使用层次锁来避免死锁:

虽然这实际上是定义锁定顺序的一个特例,但锁层次能提供一种方法,来检查运行时是否遵循约定.其思路是将应用程序分层,当代码试图锁定一个互斥元时,如果它在较低层已经持有锁定.那么就不允许它锁定该互斥元.下面是代码实现:

#include "iostream"
#include "stdexcept"
#include "thread"
#include "mutex"
#include "climits"
#include "thread_guard.h"
using namespace std;

class hierarchical_mutex {
private:
    std::mutex internal_mutex;
    std::mutex data_mutex;
    unsigned long const hierarchy_value;
    // 保存上一次的层次值,该锁不能重入,所以只需要一个简单数据类型来保存
    unsigned long previous_hierarchy_value;
    // 当前层次值
    static thread_local unsigned long this_thread_hierarchy_value;
    void check_for_hierarchy_violation() 
    {
        // 当前线程锁定了更低等级的或者是同等级的锁
        if (this_thread_hierarchy_value <= hierarchy_value)
        {
            throw std::logic_error("mutex hierarchy violated");
        }
    }
    void update_hierarchy_value() 
    {
        // 保存当前层次值
        previous_hierarchy_value = this_thread_hierarchy_value; 
        // 改变当前层次值
        this_thread_hierarchy_value = hierarchy_value; 
    }
public:
    explicit hierarchical_mutex(unsigned long value) 
        :hierarchy_value(value),previous_hierarchy_value(0){}
    void lock()
    {
        check_for_hierarchy_violation();
        internal_mutex.lock();
        lock_guard<std::mutex> lock(data_mutex);
        update_hierarchy_value();
    }
    void unlock()
    {
        internal_mutex.unlock();
        lock_guard<std::mutex> lock(data_mutex);
        this_thread_hierarchy_value = previous_hierarchy_value;

    }
    bool try_lock() 
    {
        check_for_hierarchy_violation();
        if (!internal_mutex.try_lock())
            return false;
        lock_guard<std::mutex> lock(data_mutex);
        update_hierarchy_value();
        return true; 
    }
};
thread_local unsigned long hierarchical_mutex::this_thread_hierarchy_value = ULONG_MAX;

struct Data {
    hierarchical_mutex m;
    int data; 
    Data(unsigned long hierarchical, int data) 
        :m(hierarchical), data(data) {}
    void swap(Data & d) 
    {
        lock_guard<hierarchical_mutex> lock1(m);
        std::chrono::seconds dura(1);
        std::this_thread::sleep_for(dura);
        lock_guard<hierarchical_mutex> lock2(d.m);
        std::swap(d.data, data);
    }
};

int main()
{
    Data d1(10000,1), d2(1000,2);
    {
        thread_guard t1(thread([&] {
            try
            {
                d1.swap(d2);
            }
            catch (const std::exception&)
            {
                cout << "operation not be permitted!" << endl;
            }

        }));
        thread_guard t2(thread([&] {
            try
            {
                d2.swap(d1);
            }
            catch (const std::exception&)
            {
                cout << "operation not be permitted!" << endl;
            }
        }));
    }
    cout << d1.data << endl;
    system("pause");
    return 0;
}

 

上面代码中,我们定义了一个hierarchical_mutex类作为层次锁的实现(实现特别简单,不予赘述),在main函数中,我们去尝试实验2的操作,此时将会抛出异常避免死锁(thread_guard是thread的一种RAII机制,为了避免线程忘记join).

如果锁是重入的呢,又改如何实现呢?

这里给出的思路是用一个栈来存储每一次的层次值,实现代码将用Java实现以链接方式给出:

https://github.com/Pouee/JavaSE/blob/master/src/thread/concurrent/cppcon/HierarchicalLockTest.java

将上述准则扩展到锁之外:

死锁不一定只是出现于锁定中,它可以发生在任何可以导致循环等待的同步结构中.因此,扩展上面锁述的这些准则来涵盖那些情况也是值得的.比如第一个实验,我们可以规定,所有线程的join都在主函数执行.

对于死锁的问题,我想我们不得不保证严谨的思路来考虑并发中的问题,特别是线程等待的问题.

分类: 编程