无锁(lock-free)数据结构

 提到并行计算通常都会想到加锁,事实却并非如此,大多数并发是不需要加锁的。比如在不同电脑上运行的代码编辑器,两者并发运行不需要加锁。在一台电脑上同时运行的媒体播放放器和代码编辑器,两者并发运行不需要加锁(当然系统调用和进程调度是要加锁的)。在同一个进程中运行多个线程,如果各自处理独立的事情也不需要加锁(当然系统调用、进程调度和内存分配是要加锁的)。在以上这些情况里,各个并发实体之间没有共享数据,所以虽然并发运行但不需要加锁。

多线程并发运行时,虽然有共享数据,如果所有线程只是读取共享数据而不修改它,也是不用加锁的,比如代码段就是共享的“数据”,每个线程都会读取,但是不用加锁。排除所有这些情况,多线程之间有共享数据,有的线程要修改这些共享数据,有的线程要读取这些共享数据,这才是程序员需要关注的情况,也是本节我们讨论的范围。

在并发的环境里,加锁可以保护共享的数据,但是加锁也会存在一些问题:

  • 由于临界区无法并发运行,进入临界区就需要等待,加锁带来效率的降低。
  • 在复杂的情况下,很容易造成死锁,并发实体之间无止境的互相等待。
  • 在中断/信号处理函数中不能加锁,给并发处理带来困难。
  • 优先级倒置造成实时系统不能正常工作。低级优先进程拿到高优先级进程需要的锁,结果是高/低优先级的进程都无法运行,中等优先级的进程可能在狂跑。

由于并发与加锁(互斥)的矛盾关系,无锁数据结构自然成为程序员关注的焦点,这也是本节要介绍的:

      CPU提供的原子操作

大约在七八年前,我们用apache的xerces来解析XML文件,奇怪的是多线程反而比单线程慢。他们找了很久也没有找出原因,只是证实使用多进程代替多线程会快一个数量级,在Windows上他们就使用了多进程的方式。后来移植到linux时候,我发现xerces每创建一个结点都会去更新一些全局的统计信息,比如把结点的总数加一,它使用的pthread_mutex实现互斥。这就是问题所在:一个XML文档有数以万计的结点,以50个线程并发为例,每个线程解析一个XML文档,总共要进行上百万次的加锁/解锁,几乎所有线程都在等待,你说能快得了吗?

当时我知道Windows下有InterlockedIncrement之类的函数,它们利用CPU一些特殊指令,保证对整数的基本操作是原子的。查找了一些资源发现Linux下也有类似的函数,后来我把所有加锁去掉,换成这些原子操作,速度比多进程运行还快了几倍。下面我们看++和—的原子操作在IA架构上的实现:

#define ATOMIC_SMP_LOCK "lock ; "
typedef struct { volatile int counter; } atomic_t;

static __inline__ void atomic_inc(atomic_t *v)
{
    __asm__ __volatile__(
        ATOMIC_SMP_LOCK "incl %0"
        :"=m" (v->counter)
        :"m" (v->counter));
}

static __inline__ void atomic_dec(atomic_t *v)
{
    __asm__ __volatile__(
        ATOMIC_SMP_LOCK "decl %0"
        :"=m" (v->counter)
        :"m" (v->counter));
}

单入单出的循环队列。单入单出的循环队列是一种特殊情况,虽然特殊但是很实用,重要的是它不需要加锁。这里的单入是指只有一个线程向队列里追加数据(push),单出只是指只有一个线程从队列里取数据(pop),循环队列与普通队列相比,不同之处在于它的最大数据储存量是事先固定好的,不能动态增长。尽管有这些限制它的应用还是相当广泛的。这我们介绍一下它的实现:

数据下定义如下:

typedef struct _FifoRing
{
    int r_cursor;
    int w_cursor;
    size_t length;
    void* data[0];

}FifoRing;

r_cursor指向队列头,用于取数据(pop)。w_cursor指向队列尾,用于追加数据(push)。length表示队列的最大数据储存量,data表示存放的数据,[0]在这里表示变长的缓冲区(前面我们已经讲过)。

创建函数

FifoRing* fifo_ring_create(size_t length)
{
    FifoRing* thiz = NULL;

    return_val_if_fail(length > 1, NULL);

    thiz = (FifoRing*)malloc(sizeof(FifoRing) + length * sizeof(void*));

    if(thiz != NULL)
    {
        thiz->r_cursor = 0;
        thiz->w_cursor = 0;
        thiz->length   = length;
    }

    return thiz;
}

这里我们要求队列的长度大于1而不是大于0,为什么呢?排除长度为1的队列没有什么意义的原因外,更重要的原因是队列头与队列尾重叠 (r_cursor= =w_cursor) 时,到底表示是满队列还是空队列?这个要搞清楚才行,上次一个同事犯了这个错误,让我们查了很久。这里我们认为队列头与队列尾重叠时表示队列为空,这与队列初始状态一致,后面在写的时候始终保留一个空位,避免队列头与队列尾重叠,这样可以消除歧义了。

追加数据(push)

Ret fifo_ring_push(FifoRing* thiz, void* data)
{
    int w_cursor = 0;
    Ret ret = RET_FAIL;
    return_val_if_fail(thiz != NULL, RET_FAIL);

    w_cursor = (thiz->w_cursor + 1) % thiz->length;

    if(w_cursor != thiz->r_cursor)
    {
        thiz->data[thiz->w_cursor] = data;
        thiz->w_cursor = w_cursor;

        ret = RET_OK;
    }

    return ret;
}

队列头和队列尾之间还有一个以上的空位时就追加数据,否则返回失败。

取数据(pop)

Ret fifo_ring_pop(FifoRing* thiz, void** data)
{
    Ret ret = RET_FAIL;
    return_val_if_fail(thiz != NULL && data != NULL, RET_FAIL);

    if(thiz->r_cursor != thiz->w_cursor)
    {
        *data = thiz->data[thiz->r_cursor];
        thiz->r_cursor = (thiz->r_cursor + 1)%thiz->length;

        ret = RET_OK;
    }

    return ret;
}

队列头和队列尾不重叠表示队列不为空,取数据并移动队列头。

      单写多读的无锁数据结构
      
单写表示只有一个线程去修改共享数据结构,多读表示有多个线程去读取共享数据结构。前面介绍的读写锁可以有效的解决这个问题,但更高效的办法是使用无锁数据结构。思路如下:

就像为了避免显示闪烁而使用的双缓冲一样,我们使用两份数据结构,一份数据结构用于读取,所有线程都可以在不加锁的情况下读取这个数据结构。另外一份数据结构用于修改,由于只有一个线程会修改它,所以也不用加锁。

在修改之后,我们再交换读/写的两个函数结构,把另外一份也修改过来,这样两个数据结构就一致了。在交换时要保证没有线程在读取,所以我们还需要一个读线程的引用计数。现在我们看看怎么把前面写的双向链表改为单写多读的无锁数据结构。

为了保证交换是原子的,我们需要一个新的原子操作CAS(compare and swap)。

#define CAS(_a, _o, _n)                                    \
({ __typeof__(_o) __o = _o;                                \
   __asm__ __volatile__(                                   \
       "lock cmpxchg %3,%1"                                \
       : "=a" (__o), "=m" (*(volatile unsigned int *)(_a)) \
       :  "0" (__o), "r" (_n) );                           \
   __o;                                                    \
})

数据结构

typedef struct _SwmrDList
{
    atomic_t rd_index_and_ref;
    DList* dlists[2];
}SwmrDList;

两个链表,一个用于读一个用于写。rd_index_and_ref的最高字节记录用于读取的双向链表的索引,低24位用于记录读取线程的引用记数,最大支持16777216个线程同时读取,应该是足够了,所以后面不考虑它的溢出。

读取操作

int      swmr_dlist_find(SwmrDList* thiz, DListDataCompareFunc cmp, void* ctx)
{
    int ret = 0;
    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, -1);

    atomic_inc(&(thiz->rd_index_and_ref));
    size_t rd_index = (thiz->rd_index_and_ref.counter>>24) & 0x1;
    ret = dlist_find(thiz->dlists[rd_index], cmp, ctx);
    atomic_dec(&(thiz->rd_index_and_ref));

    return ret;
}

修改操作

Ret swmr_dlist_insert(SwmrDList* thiz, size_t index, void* data)
{
    Ret ret = RET_FAIL;
    DList* wr_dlist = NULL;
    return_val_if_fail(thiz != NULL && thiz->dlists != NULL, ret);

    size_t wr_index = !((thiz->rd_index_and_ref.counter>>24) & 0x1);
    if((ret = dlist_insert(thiz->dlists[wr_index], index, data)) == RET_OK)
    {
        int rd_index_old = thiz->rd_index_and_ref.counter & 0xFF000000;
        int rd_index_new = wr_index << 24;

        do
        {
            usleep(100);
        }while(CAS(&(thiz->rd_index_and_ref), rd_index_old, rd_index_new));

        wr_index = rd_index_old>>24;
        ret = dlist_insert(thiz->dlists[wr_index], index, data);
    }

    return ret;
}

先修改用于修改的双向链表,修改完成之后等到没有线程读取时,交换读/写两个链表,再修改另一个链表,此时两个链表状态保持一致。

稍做改进,对修改的操作进行加锁,就可以支持多读多写的数据结构,读是无锁的,写是加锁的。

      真正的无锁数据结构
      Andrei Alexandrescu的《Lock-FreeDataStructures》估计是这方面最经典的论文了,对他的方法我开始感到惊奇后来感到失望,惊奇的是算法的巧妙,失望的是无锁的限制和代价。作者最后说这种数据结构只适用于WRRMBNTM(Write-Rarely-Read-Many -But-Not-Too-Many)的情况。而且每次修改都要拷贝整个数据结构(甚至多次),所以不要指望这种方法能带来多少性能上的提高,唯一的好处是能避免加锁带来的部分副作用。有兴趣的朋友可以看下这篇论文,这里我就不重复了。

时间: 2024-09-11 22:52:46

无锁(lock-free)数据结构的相关文章

《OpenACC并行程序设计:性能优化实践指南》一 1.5 无锁编程

1.5 无锁编程 互斥锁是用于同步进程或线程的常用机制,这些进程或线程需要访问并行程序中的一些共享资源.互斥锁就像它们名字所说的:如果一个线程锁住了资源,另一个线程希望访问它需要等待第一个线程解锁这个资源.一旦资源被解锁,第二个线程在处理这个资源时会一直锁住它.程序的线程必须遵守:一旦使用完共享资源尽快解锁,以保持程序执行流程. 由于OpenACC中没有锁,编程人员需要熟悉无锁编程和数据结构的概念.无锁方法保证至少一个执行该方法的线程的进展.可能存在某些线程可以被延迟的情况,但是保证至少一个线程

无锁数据结构(Lock-Free Data Structures)

原文:无锁数据结构(Lock-Free Data Structures) 一个星期前,我写了关于SQL Server里闩锁(Latches)和自旋锁(Spinlocks)的文章.2个同步原语(synchronization primitives)是用来保护SQL Server里的共享数据结构,例如缓存池里的页(通过闩锁(Latches)),锁管理器哈希表里的锁(通过自旋锁(Spinlock)).接下里你会看到越来越多的全新同步原语(synchronization primitives),即所谓的

无锁和无等待的定义和例子

原文链接,译文连接,译者:周可人,校对:梁海舰 在查阅google之后,我发现没有一处对并发算法或是数据结构规定的演进条件(progress condition,注:参考[1],译者认为翻译为演进状态更为合适)做合理的解释.甚至在"The Art of Multiprocessor Programming"中也只有围绕书本的一小段定义,大部分定义是单行的句子,因而造成了我们普通人含义模糊的理解,所以在这里我把我对这些概念的理解整理在一起,并且在每一种概念后面给出相应的例子. 我们先将演

透过 Linux 内核看无锁编程

非阻塞型同步 (Non-blocking Synchronization) 简介 如何正确有效的保护共享数据是编写并行程序必须面临的一个难题,通常的手段就是同步.同步可分为阻塞型同步(Blocking Synchronization)和非阻塞型同步( Non-blocking Synchronization). 阻塞型同步是指当一个线程到达临界区时,因另外一个线程已经持有访问该共享数据的锁,从而不能获取锁资源而阻塞,直到另外一个线程释放锁.常见的同步原语有 mutex.semaphore 等.如

使用CAS实现无锁的SkipList

感谢同事[付哲]发布此文. 无锁 并发环境下最常用的同步手段是互斥锁和读写锁,例如pthread_mutex和pthread_readwrite_lock,常用的范式为: void ConcurrencyOperation() { mutex.lock(); // do something mutex.unlock(); } 这种方法的优点是: 编程模型简单,如果小心控制上锁顺序,一般来说不会有死锁的问题: 可以通过调节锁的粒度来调节性能. 缺点是: 所有基于锁的算法都有死锁的可能: 上锁和解锁

并发编程(三): 使用C++11实现无锁stack(lock-free stack)

前几篇文章,我们讨论了如何使用mutex保护数据及使用使用condition variable在多线程中进行同步.然而,使用mutex将会导致一下问题: 等待互斥锁会消耗宝贵的时间 - 有时候是很多时间.这种延迟会损害系统的scalability.尤其是在现在可用的core越多越多的情况下. 低优先级的线程可以获得互斥锁,因此阻碍需要同一互斥锁的高优先级线程.这个问题称为优先级倒置(priority inversion ). 可能因为分配的时间片结束,持有互斥锁的线程被取消调度.这对于等待同一互

PgSQL · 源码分析 · PG中的无锁算法和原子操作应用一则

原子操作概述 近年来随着服务器上CPU核数的不断增加,无锁算法(Lock Free)越来越广泛的被应用于高并发的系统中.PostgreSQL 做为世界上最高级开源数据库也在9.5时引入了无锁算法.本文先介绍了无锁算法和原子操作在PostgreSQL中的具体实现, 再通过一个Patch来看一下在PostgreSQL中是如何利用它来解决实际的高并发问题的. 无锁算法是利用CPU的原子操作实现的数据结构和算法来解决原来只能用锁才能解决的并发控制问题. 众所周知,在一个并发系统中特别是高并发的场景下,锁

无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map.普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置. 本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现.   主要问题 链表的主要操作包含insert和remove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例: struct node_t { key_t key; value_t val; n

C/C++中无锁有序链表的实现例子

主要问题 链表的主要操作包含insert和remove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例: struct node_t {         key_t key;         value_t val;         node_t *next;     };     int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {         node_t *pred = head