C/C++中无锁有序链表的实现例子

主要问题

链表的主要操作包含insert和remove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例:

struct node_t {
        key_t key;
        value_t val;
        node_t *next;
    };
    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        while (item) {
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }
    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 如果pred本身被移除了
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
    }
    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return TRUE;
            }
            // B. 如果pred被移除;如果item也被移除
            if (CAS(&pred->next, item, item->next)) {
                haz_free(item);
                return TRUE;
            }
        }
    }

l_find函数返回查找到的前序元素和元素本身,代码A和B虽然拿到了pred和item,但在CAS的时候,其可能被其他线程移除。甚至,在l_find过程中,其每一个元素都可能被移除。问题在于,任何时候拿到一个元素时,都不确定其是否还有效。元素的有效性包括其是否还在链表中,其指向的内存是否还有效。

解决方案

通过为元素指针增加一个有效性标志位,配合CAS操作的互斥性,就可以解决元素有效性判定问题。

因为node_t放在内存中是会对齐的,所以指向node_t的指针值低几位是不会用到的,从而可以在低几位里设置标志,这样在做CAS的时候,就实现了DCAS的效果,相当于将两个逻辑上的操作变成了一个原子操作。想象下引用计数对象的线程安全性,其内包装的指针是线程安全的,但对象本身不是。

CAS的互斥性,在若干个线程CAS相同的对象时,只有一个线程会成功,失败的线程就可以以此判定目标对象发生了变更。改进后的代码(代码仅做示例用,不保证正确):

typedef size_t markable_t;
    // 最低位置1,表示元素被删除
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define MARK(p) ((markable_t)p | 0x01)
    #define STRIP_MARK(p) ((markable_t)p & ~0x01)
    int l_insert(node_t *head, key_t key, value_t val) {
        node_t *pred, *item, *new_item;
        while (TRUE) {
            if (l_find(&pred, &item, head, key)) { 
                return FALSE;
            }
            new_item = (node_t*) malloc(sizeof(node_t));
            new_item->key = key;
            new_item->val = val;
            new_item->next = item;
            // A. 虽然find拿到了合法的pred,但是在以下代码之前pred可能被删除,此时pred->next被标记
            //    pred->next != item,该CAS会失败,失败后重试
            if (CAS(&pred->next, item, new_item)) {
                return TRUE;
            }
            free(new_item);
        }
        return FALSE;
    }
    int l_remove(node_t *head, key_t key) {
        node_t *pred, *item;
        while (TRUE) {
            if (!l_find(&pred, &item, head, key)) {
                return FALSE;
            }
            node_t *inext = item->next;
            // B. 删除item前先标记item->next,如果CAS失败,那么情况同insert一样,有其他线程在find之后
            //    删除了item,失败后重试
            if (!CAS(&item->next, inext, MARK(inext))) {
                continue;
            }
            // C. 对同一个元素item删除时,只会有一个线程成功走到这里
            if (CAS(&pred->next, item, STRIP_MARK(item->next))) {
                haz_defer_free(item);
                return TRUE;
            }
        }
        return FALSE;
    }
    int l_find(node_t **pred_ptr, node_t **item_ptr, node_t *head, key_t key) {
        node_t *pred = head;
        node_t *item = head->next;
        hazard_t *hp1 = haz_get(0);
        hazard_t *hp2 = haz_get(1);
        while (item) {
            haz_set_ptr(hp1, pred);
            haz_set_ptr(hp2, item);
            /* 
             如果已被标记,那么紧接着item可能被移除链表甚至释放,所以需要重头查找
            */
            if (HAS_MARK(item->next)) { 
                return l_find(pred_ptr, item_ptr, head, key);
            }
            int d = KEY_CMP(item->key, key);
            if (d >= 0) {
                *pred_ptr = pred;
                *item_ptr = item;
                return d == 0 ? TRUE : FALSE;
            }
            pred = item;
            item = item->next;
        } 
        *pred_ptr = pred;
        *item_ptr = NULL;
        return FALSE;
    }

haz_get、haz_set_ptr之类的函数是一个hazard pointer实现,用于支持多线程下内存的GC。上面的代码中,要删除一个元素item时,会标记item->next,从而使得insert时中那个CAS不需要做任何调整。总结下这里的线程竞争情况:

insert中find到正常的pred及item,pred->next == item,然后在CAS前有线程删除了pred,此时pred->next == MARK(item),CAS失败,重试;删除分为2种情况:a) 从链表移除,得到标记,pred可继续访问;b) pred可能被释放内存,此时再使用pred会错误。为了处理情况b,所以引入了类似hazard pointer的机制,可以有效保障任意一个指针p只要还有线程在使用它,它的内存就不会被真正释放
insert中有多个线程在pred后插入元素,此时同样由insert中的CAS保证,这个不多说
remove中情况同insert,find拿到了有效的pred和next,但在CAS的时候pred被其他线程删除,此时情况同insert,CAS失败,重试
任何时候改变链表结构时,无论是remove还是insert,都需要重试该操作
find中遍历时,可能会遇到被标记删除的item,此时item根据remove的实现很可能被删除,所以需要重头开始遍历
ABA问题

ABA问题还是存在的,insert中:

if (CAS(&pred->next, item, new_item)) {

        return TRUE;

    }

如果CAS之前,pred后的item被移除,又以相同的地址值加进来,但其value变了,此时CAS会成功,但链表可能就不是有序的了。pred->val < new_item->val > item->val

为了解决这个问题,可以利用指针值地址对齐的其他位来存储一个计数,用于表示pred->next的改变次数。当insert拿到pred时,pred->next中存储的计数假设是0,CAS之前其他线程移除了pred->next又新增回了item,此时pred->next中的计数增加,从而导致insert中CAS失败。

// 最低位留作删除标志
    #define MASK ((sizeof(node_t) - 1) & ~0x01)
    #define GET_TAG(p) ((markable_t)p & MASK)
    #define TAG(p, tag) ((markable_t)p | (tag))
    #define MARK(p) ((markable_t)p | 0x01)
    #define HAS_MARK(p) ((markable_t)p & 0x01)
    #define STRIP_MARK(p) ((node_t*)((markable_t)p & ~(MASK | 0x01)))
remove的实现:
/* 先标记再删除 */
    if (!CAS(&sitem->next, inext, MARK(inext))) {
        continue;
    }
    int tag = GET_TAG(pred->next) + 1;
    if (CAS(&pred->next, item, TAG(STRIP_MARK(sitem->next), tag))) {
        haz_defer_free(sitem);
        return TRUE;
    }

insert中也可以更新pred->next的计数。

总结

无锁的实现,本质上都会依赖于CAS的互斥性。从头实现一个lock free的数据结构,可以深刻感受到lock free实现的tricky。最终代码可以从这里github获取。代码中为了简单,实现了一个不是很强大的hazard pointer

以上是小编为您精心准备的的内容,在的博客、问答、公众号、人物、课程等栏目也有的相关内容,欢迎继续使用右上角搜索按钮进行搜索指针
, 内存
, int
, 对象
, 线程
代码
有序链表、合并两个有序链表、两个有序单链表的合并、无锁链表、有序链表合并,以便于您获取更多的相关知识。

时间: 2024-08-03 20:22:55

C/C++中无锁有序链表的实现例子的相关文章

无锁有序链表的实现

无锁有序链表可以保证元素的唯一性,使其可用于哈希表的桶,甚至直接作为一个效率不那么高的map.普通链表的无锁实现相对简单点,因为插入元素可以在表头插,而有序链表的插入则是任意位置. 本文主要基于论文High Performance Dynamic Lock-Free Hash Tables实现.   主要问题 链表的主要操作包含insert和remove,先简单实现一个版本,就会看到问题所在,以下代码只用作示例: struct node_t { key_t key; value_t val; n

Java中无锁HashMap的原理与实现教程

在<Java出现HashMap的死循环的原因及解决方法>中,我们看到,java.util.HashMap并不能直接应用于多线程环境.对于多线程环境中应用HashMap,主要有以下几种选择: 使用线程安全的java.util.Hashtable作为替代. 使用java.util.Collections.synchronizedMap方法,将已有的HashMap对象包装为线程安全的. 使用java.util.concurrent.ConcurrentHashMap类作为替代,它具有非常好的性能.

使用CAS实现无锁的SkipList

感谢同事[付哲]发布此文. 无锁 并发环境下最常用的同步手段是互斥锁和读写锁,例如pthread_mutex和pthread_readwrite_lock,常用的范式为: void ConcurrencyOperation() { mutex.lock(); // do something mutex.unlock(); } 这种方法的优点是: 编程模型简单,如果小心控制上锁顺序,一般来说不会有死锁的问题: 可以通过调节锁的粒度来调节性能. 缺点是: 所有基于锁的算法都有死锁的可能: 上锁和解锁

ReferenceCountSet无锁实现

记得很久以前有一次面试被问到如何编写无锁程序,我当时觉得那个面试官脑子进水了,我们确实可以在某些情况下减少锁的使用,但是怎么可能避免呢?当然我现在还是持这种观点,在Java中,你可以有很多方法减少锁的使用(至少在你自己的代码中看起来): 1.     比如常见的可以使用volatile关键字来保证某个字段在一个线程中的更新对其他线程的可见性: 2.     可以使用concurrent.atomic包中的各种Atomic类来实现某些基本类型操作的,它主要采用忙等机制(CAS,compare an

PgSQL · 源码分析 · PG中的无锁算法和原子操作应用一则

原子操作概述 近年来随着服务器上CPU核数的不断增加,无锁算法(Lock Free)越来越广泛的被应用于高并发的系统中.PostgreSQL 做为世界上最高级开源数据库也在9.5时引入了无锁算法.本文先介绍了无锁算法和原子操作在PostgreSQL中的具体实现, 再通过一个Patch来看一下在PostgreSQL中是如何利用它来解决实际的高并发问题的. 无锁算法是利用CPU的原子操作实现的数据结构和算法来解决原来只能用锁才能解决的并发控制问题. 众所周知,在一个并发系统中特别是高并发的场景下,锁

Java中的线程同步与ThreadLocal无锁化线程封闭实现_java

Synchronized关键字 Java语言的关键字,当它用来修饰一个方法或者一个代码块的时候,能够保证在同一时刻最多只有一个线程执行该段代码. 当两个并发线程访问同一个对象object中的这个synchronized(this)同步代码块时,一个时间内只能有一个线程得到执行.另一个线程必须等待当前线程执行完这个代码块以后才能执行该代码块. 然而,当一个线程访问object的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该object中的非synchronized(

java无锁hashmap原理与实现详解_java

java多线程环境中应用HashMap,主要有以下几种选择:使用线程安全的java.util.Hashtable作为替代​使用java.util.Collections.synchronizedMap方法,将已有的HashMap对象包装为线程安全的.使用java.util.concurrent.ConcurrentHashMap类作为替代,它具有非常好的性能.而以上几种方法在实现的具体细节上,都或多或少地用到了互斥锁.互斥锁会造成线程阻塞,降低运行效率,并有可能产生死锁.优先级翻转等一系列问题.

无锁并发框架Disruptor

概述 在逛并发编程网的时候,看到了并发框架Disruptor译文这个系列文章. Martin Fowler在自己网站上写了一篇LMAX架构(译文)的文章,在文章中他介绍了LMAX是一种新型零售金融交易平台,它能够以很低的延迟产生大量交易.这个系统是建立在JVM平台上,其核心是一个业务逻辑处理器,它能够在一个线程里每秒处理6百万订单.业务逻辑处理器完全是运行在内存中,使用事件源驱动方式.业务逻辑处理器的核心是Disruptor. Disruptor它是一个开源的并发框架,能够在无锁的情况下实现网络

无锁和无等待的定义和例子

原文链接,译文连接,译者:周可人,校对:梁海舰 在查阅google之后,我发现没有一处对并发算法或是数据结构规定的演进条件(progress condition,注:参考[1],译者认为翻译为演进状态更为合适)做合理的解释.甚至在"The Art of Multiprocessor Programming"中也只有围绕书本的一小段定义,大部分定义是单行的句子,因而造成了我们普通人含义模糊的理解,所以在这里我把我对这些概念的理解整理在一起,并且在每一种概念后面给出相应的例子. 我们先将演