java-并发-并发容器(2)

和java.util.HashMap比起来,基本类似,有两个地方不同。一个是对onlyIfAbsent参数的判断处理。另一个则是对整个操作过程加锁,并在加锁的地方做了稍微巧妙的处理。就是在在等锁的过程中,不断的寻找和构建node节点对象,不管是否这个方法最终是否创建到了node节点,当这个方法返回时,一定是已经获得了这个Segment对应的锁。
而这个寻找和创建节点所在的循环,一方面是做节点的遍历查询,另一方面也是起到了自旋锁的作用,避免直接调用lock()而等锁阻塞,因为这样会在系统实现层面阻塞和唤醒线程,是有一定的切换成本的,对提高效率不利。
整个类的实现,都肯定会涉及到Segment的处理和其中方法的调用。对这些方法的调用,分为读操作和写处理,通常读操作没用用锁,而在修改操作,如remove() replace()等方法都和put()类似,在整个操作过程中对Segment进行了尝试加锁和自旋等锁的前提操作,并最后释放锁。这些写操作的等锁基本上和put()中的scanAndLockForPut()方法等同,除了需要创建节点。

从整体上看,由于Segment降低了并发中的锁粒度,并在写操作使用了锁。保证了整个ConcurrentHashMap的线程安全,也保证了并发运行时的效率。
在java.util.conccurent包中并没有TreeMap,但对于有序要求的容器,有基于skip list数据结构的Map实现ConcurrentSkipListMap,而且也有skip list的ConcurrentSkipListSet,和java.util包中的Set一样,是基于Map的.
ConcurrentHashMap是Java 5中支持高并发、高吞吐量的线程安全HashMap实现。在这之前我对ConcurrentHashMap只有一些肤浅的理解,仅知道它采用了多个锁,大概也足够了。但是在经过一次惨痛的面试经历之后,我觉得必须深入研究它的实现。面试中被问到读是否要加锁,因为读写会发生冲突,我说必须要加锁,我和面试官也因此发生了冲突,结果可想而知。还是闲话少说,通过仔细阅读源代码,现在总算理解ConcurrentHashMap实现机制了,其实现之精巧,令人叹服,与大家共享之。
锁分离 (Lock Stripping)
ConcurrentHashMap允许多个修改操作并发进行,其关键在于使用了锁分离技术。它使用了多个锁来控制对hash表的不同部分进行的修改。ConcurrentHashMap内部使用段(Segment)来表示这些不同的部分,每个段其实就是一个小的hash table,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。不变性是多线程编程占有很重要的地位,下面还要谈到。

/**
 * The segments, each of which is a specialized hash table
 */
final Segment<K,V>[] segments; 

不变(Immutable)和易变(Volatile)

ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

不变(Immutable)和易变(Volatile)

ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,读操作不加锁将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个hash链中的一个节点,其结构如下所示:

可以看到除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除节点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到Hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将要删除节点的前面所有节点整个复制一遍,最后一个节点指向要删除结点的下一个结点。这在讲解删除操作时还会详述。为了确保读操作能够看到最新的值,将value设置成volatile,这避免了加锁。

其它

为了加快定位段以及段中hash槽的速度,每个段hash槽的的个数都是2^n,这使得通过位运算就可以定位段和段中hash槽的位置。当并发级别为默认值16时,也就是段的个数,hash值的高4位决定分配在哪个段中。但是我们也不要忘记《算法导论》给我们的教训:hash槽的的个数不应该是2^n,这可能导致hash槽分配不均,这需要对hash值重新再hash一次。(这段似乎有点多余了 )
这是重新hash的算法,还比较复杂

private static int hash(int h) {
    // Spread bits to regularize both segment and index locations,
    // using variant of single-word Wang/Jenkins hash.
    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}  

定位段的方法:

final Segment<K,V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
} 

数据结构
关于Hash表的基础数据结构,这里不想做过多的探讨。Hash表的一个很重要方面就是如何解决hash冲突,ConcurrentHashMap和HashMap使用相同的方式,都是将hash值相同的节点放在一个hash链中。与HashMap不同的是,ConcurrentHashMap使用多个子Hash表,也就是段(Segment)。下面是ConcurrentHashMap的数据成员:

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V>
        implements ConcurrentMap<K, V>, Serializable {
    /**
     * Mask value for indexing into segments. The upper bits of a
     * key's hash code are used to choose the segment.
     */
    final int segmentMask;  

    /**
     * Shift value for indexing within segments.
     */
    final int segmentShift;  

    /**
     * The segments, each of which is a specialized hash table
     */
    final Segment<K,V>[] segments;
}  

所有的成员都是final的,其中segmentMask和segmentShift主要是为了定位段,参见上面的segmentFor方法。
每个Segment相当于一个子Hash表,它的数据成员如下:

   static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
        /**
         * The number of elements in this segment's region.
         */
        transient volatile int count;  

        /**
         * Number of updates that alter the size of the table. This is
         * used during bulk-read methods to make sure they see a
         * consistent snapshot: If modCounts change during a traversal
         * of segments computing size or checking containsValue, then
         * we might have an inconsistent view of state so (usually)
         * must retry.
         */
        transient int modCount;  

        /**
         * The table is rehashed when its size exceeds this threshold.
         * (The value of this field is always <tt>(int)(capacity *
         * loadFactor)</tt>.)
         */
        transient int threshold;  

        /**
         * The per-segment table.
         */
        transient volatile HashEntry<K,V>[] table;  

        /**
         * The load factor for the hash table.  Even though this value
         * is same for all segments, it is replicated to avoid needing
         * links to outer object.
         * @serial
         */
        final float loadFactor;
}  

count用来统计该段数据的个数,它是volatile,它用来协调修改和读取操作,以保证读取操作能够读取到几乎最新的修改。协调方式是这样的,每次修改操作做了结构上的改变,如增加/删除节点(修改节点的值不算结构上的改变),都要写count值,每次读取操作开始都要读取count的值。这利用了Java 5中对volatile语义的增强,对同一个volatile变量的写和读存在happens-before关系。modCount统计段结构改变的次数,主要是为了检测对多个段进行遍历过程中某个段是否发生改变,在讲述跨段操作时会还会详述。threashold用来表示需要进行rehash的界限值。table数组存储段中节点,每个数组元素是个hash链,用HashEntry表示。table也是volatile,这使得能够读取到最新的table值而不需要同步。loadFactor表示负载因子。
实现细节

public V remove(Object key) {
 hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}  

整个操作是先定位到段,然后委托给段的remove操作。当多个删除操作并发进行时,只要它们所在的段不相同,它们就可以同时进行。下面是Segment的remove方法实现:

V remove(Object key, int hash, Object value) {
    lock();
    try {
        int c = count - 1;
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;  

        V oldValue = null;
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                // All entries following removed node can stay
                // in list, but all preceding ones need to be
                // cloned.
                ++modCount;
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next)
                    newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                  newFirst, p.value);
                tab[index] = newFirst;
                count = c; // write-volatile
            }
        }
        return oldValue;
    } finally {
        unlock();
    }
}  

整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。接下来,如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。e后面的结点不需要复制,它们可以重用。
http://www.ibm.com/developerworks/java/library/j-jtp08223/
第二个图其实有点问题,复制的结点中应该是值为2的结点在前面,值为1的结点在后面,也就是刚好和原来结点顺序相反,还好这不影响我们的讨论
整个remove实现并不复杂,但是需要注意如下几点。第一,当要删除的结点存在时,删除的最后一步操作要将count的值减一。这必须是最后一步操作,否则读取操作可能看不到之前对段所做的结构性修改。第二,remove执行的开始就将table赋给一个局部变量tab,这是因为table是volatile变量,读写volatile变量的开销很大。编译器也不能对volatile变量的读写做任何优化,直接多次访问非volatile实例变量没有多大影响,编译器会做相应优化。
接下来看put操作,同样地put操作也是委托给段的put方法。下面是段的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    lock();
    try {
        int c = count;
        if (c++ > threshold) // ensure capacity
            rehash();
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key)))
            e = e.next;  

        V oldValue;
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent)
                e.value = value;
        }
        else {
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K,V>(key, hash, first, value);
            count = c; // write-volatile
        }
        return oldValue;
    } finally {
        unlock();
    }
}  

该方法也是在持有段锁的情况下执行的,首先判断是否需要rehash,需要就先rehash。接着是找是否存在同样一个key的结点,如果存在就直接替换这个结点的值。否则创建一个新的结点并添加到hash链的头部,这时一定要修改modCount和count的值,同样修改count的值一定要放在最后一步。put方法调用了rehash方法,reash方法实现得也很精巧,主要利用了table的大小为2^n,这里就不介绍了。

修改操作还有putAll和replace。putAll就是多次调用put方法,没什么好说的。replace甚至不用做结构上的更改,实现要比put和delete要简单得多,理解了put和delete,理解replace就不在话下了,这里也不介绍了。
获取操作

首先看下get操作,同样ConcurrentHashMap的get操作是直接委托给Segment的get方法,直接看Segment的get方法:

V get(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key)) {
                V v = e.value;
                if (v != null)
                    return v;
                return readValueUnderLock(e); // recheck
            }
            e = e.next;
        }
    }
    return null;
}  

get操作不需要锁。第一步是访问count变量,这是一个volatile变量,由于所有的修改操作在进行结构修改时都会在最后一步写count变量,通过这种机制保证get操作能够得到几乎最新的结构更新。对于非结构更新,也就是结点值的改变,由于HashEntry的value变量是volatile的,也能保证读取到最新的值。接下来就是对hash链进行遍历找到要获取的结点,如果没有找到,直接访回null。对hash链进行遍历不需要加锁的原因在于链指针next是final的。但是头指针却不是final的,这是通过getFirst(hash)方法返回,也就是存在table数组中的值。这使得getFirst(hash)可能返回过时的头结点,例如,当执行get方法时,刚执行完getFirst(hash)之后,另一个线程执行了删除操作并更新头结点,这就导致get方法中返回的头结点不是最新的。这是可以允许,通过对count变量的协调机制,get能读取到几乎最新的数据,虽然可能不是最新的。要得到最新的数据,只有采用完全的同步。

最后,如果找到了所求的结点,判断它的值如果非空就直接返回,否则在有锁的状态下再读一次。这似乎有些费解,理论上结点的值不可能为空,这是因为put的时候就进行了判断,如果为空就要抛NullPointerException。空值的唯一源头就是HashEntry中的默认值,因为HashEntry中的value不是final的,非同步读取有可能读取到空值。仔细看下put操作的语句:tab[index] = new HashEntry

V readValueUnderLock(HashEntry<K,V> e) {
    lock();
    try {
        return e.value;
    } finally {
        unlock();
    }
}  

另一个操作是containsKey,这个实现就要简单得多了,因为它不需要读取值:

boolean containsKey(Object key, int hash) {
    if (count != 0) { // read-volatile
        HashEntry<K,V> e = getFirst(hash);
        while (e != null) {
            if (e.hash == hash && key.equals(e.key))
                return true;
            e = e.next;
        }
    }
    return false;
}  

跨段操作
有些操作需要涉及到多个段,比如说size(), containsValaue()。先来看下size()方法:

public int size() {
    final Segment<K,V>[] segments = this.segments;
    long sum = 0;
    long check = 0;
    int[] mc = new int[segments.length];
    // Try a few times to get accurate count. On failure due to
    // continuous async changes in table, resort to locking.
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        check = 0;
        sum = 0;
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            sum += segments[i].count;
            mcsum += mc[i] = segments[i].modCount;
        }
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                check += segments[i].count;
                if (mc[i] != segments[i].modCount) {
                    check = -1; // force retry
                    break;
                }
            }
        }
        if (check == sum)
            break;
    }
    if (check != sum) { // Resort to locking all segments
        sum = 0;
        for (int i = 0; i < segments.length; ++i)
            segments[i].lock();
        for (int i = 0; i < segments.length; ++i)
            sum += segments[i].count;
        for (int i = 0; i < segments.length; ++i)
            segments[i].unlock();
    }
    if (sum > Integer.MAX_VALUE)
        return Integer.MAX_VALUE;
    else
        return (int)sum;
}  

size方法主要思路是先在没有锁的情况下对所有段大小求和,如果不能成功(这是因为遍历过程中可能有其它线程正在对已经遍历过的段进行结构性更新),最多执行RETRIES_BEFORE_LOCK次,如果还不成功就在持有所有段锁的情况下再对所有段大小求和。在没有锁的情况下主要是利用Segment中的modCount进行检测,在遍历过程中保存每个Segment的modCount,遍历完成之后再检测每个Segment的modCount有没有改变,如果有改变表示有其它线程正在对Segment进行结构性并发更新,需要重新计算。
其实这种方式是存在问题的,在第一个内层for循环中,在这两条语句sum += segments[i].count; mcsum += mc[i] = segments[i].modCount;之间,其它线程可能正在对Segment进行结构性的修改,导致segments[i].count和segments[i].modCount读取的数据并不一致。这可能使size()方法返回任何时候都不曾存在的大小,很奇怪javadoc居然没有明确标出这一点,可能是因为这个时间窗口太小了吧。size()的实现还有一点需要注意,必须要先segments[i].count,才能segments[i].modCount,这是因为segment[i].count是对volatile变量的访问,接下来segments[i].modCount才能得到几乎最新的值(前面我已经说了为什么只是“几乎”了)。这点在containsValue方法中得到了淋漓

public boolean containsValue(Object value) {
    if (value == null)
        throw new NullPointerException();  

    // See explanation of modCount use above  

    final Segment<K,V>[] segments = this.segments;
    int[] mc = new int[segments.length];  

    // Try a few times without locking
    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
        int sum = 0;
        int mcsum = 0;
        for (int i = 0; i < segments.length; ++i) {
            int c = segments[i].count;
            mcsum += mc[i] = segments[i].modCount;
            if (segments[i].containsValue(value))
                return true;
        }
        boolean cleanSweep = true;
        if (mcsum != 0) {
            for (int i = 0; i < segments.length; ++i) {
                int c = segments[i].count;
                if (mc[i] != segments[i].modCount) {
                    cleanSweep = false;
                    break;
                }
            }
        }
        if (cleanSweep)
            return false;
    }
    // Resort to locking all segments
    for (int i = 0; i < segments.length; ++i)
        segments[i].lock();
    boolean found = false;
    try {
        for (int i = 0; i < segments.length; ++i) {
            if (segments[i].containsValue(value)) {
                found = true;
                break;
            }
        }
    } finally {
        for (int i = 0; i < segments.length; ++i)
            segments[i].unlock();
    }
    return found;
} 
时间: 2024-10-20 12:43:45

java-并发-并发容器(2)的相关文章

Java 高并发五:JDK并发包1详细介绍_java

在[高并发Java 二] 多线程基础中,我们已经初步提到了基本的线程同步操作.这次要提到的是在并发包中的同步控制工具. 1. 各种同步控制工具的使用 1.1 ReentrantLock ReentrantLock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给JVM去处理,但是功能上是比较薄弱的.在JDK1.5之前,ReentrantLock的性能要好于synchronized,由于对JVM进行了优化,现在的JDK版本中,两者性能是不相上下的.如果是简

java 集合并发操作出现的异常ConcurrentModificationException_java

如Java中的容器Map: for(Person person : pList){ if(person.getGender()==Gender.MALE){ pList.remove(person); //不能在遍历期间进行 remove这个操作 } } Map在遍历时候通常 现获得其键值的集合Set,然后用迭代器Iterator来对Map进行遍历. 注意在遍历的过程中,只能对Map中的元素进行相应的处理,不能把Map元素增加或者把Map元素减少,也就是说,不能改变Map的size大小,就会出现

解决Java多线程并发的计数器问题

问题描述 解决Java多线程并发的计数器问题 3C public class Counter { public static int count = 0; public synchronized static void inc() { count++; } public static void main(String[] args) { //同时启动1000个线程,去进行i++计算,看看实际结果 for (int i = 0; i < 1000; i++) { new Thread(new Ru

《Java 7并发编程实战手册》第六章并发集合

由人民邮电出版社出版的<Java 7并发编程实战手册>终于出版了,译者是俞黎敏和申绍勇,该书将于近期上架.之前并发编程网组织翻译过此书,由于邮电出版社在并发网联系他们之前就找到了译者,所以没有采用并发网的译稿,但邮电出版社将于并发网展开合作,发布该书的样章(样章由并发网挑选,你也可以回帖告诉我们你想看哪一章的样章),并组织赠书活动回馈给活跃读者.活动详情请时刻关注并发网的微博和微信(微信号:ifeves),最后祝各位用餐愉快!:) 本章将介绍下列内容: 使用非阻塞式线程安全列表 使用阻塞式线程

Java 高并发十: JDK8对并发的新支持详解_java

1. LongAdder 和AtomicLong类似的使用方式,但是性能比AtomicLong更好. LongAdder与AtomicLong都是使用了原子操作来提高性能.但是LongAdder在AtomicLong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能.在无锁中,也可以用类似的方式来增加CAS的成功率,从而提高性能. LongAdder原理图: AtomicLong的实现方式是内部有个value 变量,当多线程并发自增,自减时,均通过CA

Java 高并发九:锁的优化和注意事项详解_java

摘要 本系列基于炼数成金课程,为了更好的学习,做了系列的记录. 本文主要介绍: 1. 锁优化的思路和方法 2. 虚拟机内的锁优化 3. 一个错误使用锁的案例 4. ThreadLocal及其源码分析 1. 锁优化的思路和方法 在[高并发Java 一] 前言中有提到并发的级别. 一旦用到锁,就说明这是阻塞式的,所以在并发度上一般来说都会比无锁的情况低一点. 这里提到的锁优化,是指在阻塞式的情况下,如何让性能不要变得太差.但是再怎么优化,一般来说性能都会比无锁的情况差一点. 这里要注意的是,在[高并

Java 高并发八:NIO和AIO详解_java

IO感觉上和多线程并没有多大关系,但是NIO改变了线程在应用层面使用的方式,也解决了一些实际的困难.而AIO是异步IO和前面的系列也有点关系.在此,为了学习和记录,也写一篇文章来介绍NIO和AIO. 1. 什么是NIO NIO是New I/O的简称,与旧式的基于流的I/O方法相对,从名字看,它表示新的一套Java I/O标 准.它是在Java 1.4中被纳入到JDK中的,并具有以下特性: NIO是基于块(Block)的,它以块为基本单位处理数据 (硬盘上存储的单位也是按Block来存储,这样性能

《Java 7并发编程实战手册》第五章Fork/Join框架

感谢人民邮电大学授权并发网发布此书样章,新书已上市,购买请进当当网 本章内容包含: 创建Fork/Join线程池 合并任务的结果 异步运行任务 在任务中抛出异常 取消任务 5.1 简介 通常,使用Java来开发一个简单的并发应用程序时,会创建一些Runnable对象,然后创建对应的Thread 对象来控制程序中这些线程的创建.执行以及线程的状态.自从Java 5开始引入了Executor和ExecutorService接口以及实现这两个接口的类(比如ThreadPoolExecutor)之后,使

java多线程并发问题求解

问题描述 java多线程并发问题求解 父类中定义了几个成员变量String类型 a,b,c,这个父类被几个子类共同继承了, 各个子类中在构造器内初始化了a,b,c变量,问多线程调用每一个子类时会 产生并发问题吗? 父类: 解决方案 首先,需要看你的这类是如何设计的,如果只是提供 了构造函数来初始化这几个成员变量,而没有提供外界修改方法如setA...等方法的话,那么你这个类就是线程安全的,因为对象的信息不可能被外界改变. 如果提供了修改方法,那么对于同一个对象,置于多线程访问条件下,就有可能出现

Java 高并发六:JDK并发包2详解_java

1. 线程池的基本使用 1.1.为什么需要线程池 平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程.但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务.因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面.而线程池则解决了这个问题,线程池的作用就是将线程进行复用. 1.2.JDK为我们提供了哪些支持  JDK中的相关类图如上图所示. 其中要提到的几个特别的类. Callable类和Runable类相似,