Disruptor-NET和内存栅栏

Disruptor-NET算法(是一种无锁算法)需要我们自己实现某一种特定的内存操作的语义以保证算法的正确性。这时我们就需要显式的使用一些指令来控制内存操作指令的顺序以及其可见性定义。这种指令称为内存栅栏。

内存一致性模型需要在各种的程序与系统的各个层次上定义内存访问的行为。在机器码与的层次上,其定义将影响硬件的设计者以及机器码开发人员;而在高级语言层次上,其定义将影响高级语言开发人员以及编译器开发人员和硬件设计人员。即,内存操作的乱序在各个层次都是存在的。这里,所谓的程序的执行顺序有三种:

(1)程序顺序:指在特定CPU上运行的,执行内存操作的代码的顺序。这指的是编译好的程序二进制镜像中的指令的顺序。编译器并不一定严格按照程序的顺序进行二进制代码的编排。编译器可以按照既定的规则,在执行代码优化的时候打乱指令的执行顺序,也就是上面说的程序顺序。并且,编译器可以根据程序的特定行为进行性能优化,这种优化可能改变算法的形式与算法的执行复杂度。(例如将switch转化为表驱动序列)
(2)执行顺序:指在CPU上执行的独立的内存相关的代码执行的顺序。执行顺序和程序顺序可能不同,这种不同是编译器和CPU优化造成的结果。CPU在执行期(Runtime)根据自己的内存模型(跟编译器无关)打乱已经编译好了的指令的顺序,以达到程序的优化和最大限度的资源利用。

(3)感知顺序:指特定的CPU感知到他自身的或者其他CPU对内存进行操作的顺序。感知顺序和执行顺序可能还不一样。这是由于缓存优化或者内存优化系统造成的。

而最终的共享内存模型的表现形式是由这三种“顺序”共同决定的。即从源代码到最终执行进行了至少三个层次上的代码顺序调整,分别是由编译器和CPU完成的。我们上面提到,这种代码执行顺序的改变虽然在单线程程序中不会引发副作用,但是在多线程程序中,这种作用是不能够被忽略的,甚至可能造成完全错误的结果。因此,在多线程程序中,我们有时需要人为的限制内存执行的顺序。而这种限制是通过不同层次的内存栅栏完成的。

Thread.MemoryBarrier就是采用了CPU提供的某些特定的指令的内存栅栏,下面是msdn的解释【http://msdn.microsoft.com/zh-cn/library/vstudio/system.threading.thread.memorybarrier(v=vs.100).aspx】:

Thread.MemoryBarrier: 按如下方式同步内存访问:执行当前线程的处理器在对指令重新排序时,不能采用先执行 MemoryBarrier 调用之后的内存访问,再执行 MemoryBarrier 调用之前的内存访问的方式。

按照我个人的理解:就是写完数据之后,调用MemoryBarrier,数据就会立即刷新,另外在读取数据之前调用MemoryBarrier可以确保读取的数据是最新的,并且处理器对MemoryBarrier的优化小心处理。

int _answer;
        bool _complete;

        void A()
        {
            _answer = 123;
            Thread.MemoryBarrier(); //在写完之后,创建内存栅栏
            _complete = true;
            Thread.MemoryBarrier();//在写完之后,创建内存栅栏
       }

        void B()
        {
            Thread.MemoryBarrier();//在读取之前,创建内存栅栏
            if (_complete)
            {
                Thread.MemoryBarrier();//在读取之前,创建内存栅栏
                Console.WriteLine(_answer);
            }
        }

Disruptor-NET正是通过Thread.MemoryBarrier 实现无锁和线程安全的内存操作,看下面是他的Atomic的Volatile类对常用数据类型的封装,volatile可以阻止代码重排序,并且值被更新的时候,会导致缓存失效,强制回写到主存中。

/// <summary>
        /// An integer value that may be updated atomically
        /// </summary>
        public struct Integer
        {
            private int _value;

            /// <summary>
            /// Create a new <see cref="Integer"/> with the given initial value.
            /// </summary>
            /// <param name="value">Initial value</param>
            public Integer(int value)
            {
                _value = value;
            }

            /// <summary>
            /// Read the value without applying any fence
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadUnfenced()
            {
                return _value;
            }

            /// <summary>
            /// Read the value applying acquire fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadAcquireFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying full fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadFullFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying a compiler only fence, no CPU fence is applied
            /// </summary>
            /// <returns>The current value</returns>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public int ReadCompilerOnlyFence()
            {
                return _value;
            }

            /// <summary>
            /// Write the value applying release fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteReleaseFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying full fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteFullFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying a compiler fence only, no CPU fence is applied
            /// </summary>
            /// <param name="newValue">The new value</param>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public void WriteCompilerOnlyFence(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Write without applying any fence
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteUnfenced(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Atomically set the value to the given updated value if the current value equals the comparand
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <param name="comparand">The comparand (expected value)</param>
            /// <returns></returns>
            public bool AtomicCompareExchange(int newValue, int comparand)
            {
                return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
            }

            /// <summary>
            /// Atomically set the value to the given updated value
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <returns>The original value</returns>
            public int AtomicExchange(int newValue)
            {
                return Interlocked.Exchange(ref _value, newValue);
            }

            /// <summary>
            /// Atomically add the given value to the current value and return the sum
            /// </summary>
            /// <param name="delta">The value to be added</param>
            /// <returns>The sum of the current value and the given value</returns>
            public int AtomicAddAndGet(int delta)
            {
                return Interlocked.Add(ref _value, delta);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The incremented value.</returns>
            public int AtomicIncrementAndGet()
            {
                return Interlocked.Increment(ref _value);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The decremented value.</returns>
            public int AtomicDecrementAndGet()
            {
                return Interlocked.Decrement(ref _value);
            }

            /// <summary>
            /// Returns the string representation of the current value.
            /// </summary>
            /// <returns>the string representation of the current value.</returns>
            public override string ToString()
            {
                var value = ReadFullFence();
                return value.ToString();
            }
        }
        /// <summary>
        /// An integer value that may be updated atomically and is guaranteed to live on its own cache line (to prevent false sharing)
        /// </summary>
        [StructLayout(LayoutKind.Explicit, Size = CacheLineSize * 2)]
        public struct PaddedInteger
        {
            [FieldOffset(CacheLineSize)]
            private int _value;

            /// <summary>
            /// Create a new <see cref="PaddedInteger"/> with the given initial value.
            /// </summary>
            /// <param name="value">Initial value</param>
            public PaddedInteger(int value)
            {
                _value = value;
            }

            /// <summary>
            /// Read the value without applying any fence
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadUnfenced()
            {
                return _value;
            }

            /// <summary>
            /// Read the value applying acquire fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadAcquireFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying full fence semantic
            /// </summary>
            /// <returns>The current value</returns>
            public int ReadFullFence()
            {
                var value = _value;
                Thread.MemoryBarrier();
                return value;
            }

            /// <summary>
            /// Read the value applying a compiler only fence, no CPU fence is applied
            /// </summary>
            /// <returns>The current value</returns>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public int ReadCompilerOnlyFence()
            {
                return _value;
            }

            /// <summary>
            /// Write the value applying release fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteReleaseFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying full fence semantic
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteFullFence(int newValue)
            {
                _value = newValue;
                Thread.MemoryBarrier();
            }

            /// <summary>
            /// Write the value applying a compiler fence only, no CPU fence is applied
            /// </summary>
            /// <param name="newValue">The new value</param>
            [MethodImpl(MethodImplOptions.NoOptimization)]
            public void WriteCompilerOnlyFence(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Write without applying any fence
            /// </summary>
            /// <param name="newValue">The new value</param>
            public void WriteUnfenced(int newValue)
            {
                _value = newValue;
            }

            /// <summary>
            /// Atomically set the value to the given updated value if the current value equals the comparand
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <param name="comparand">The comparand (expected value)</param>
            /// <returns></returns>
            public bool AtomicCompareExchange(int newValue, int comparand)
            {
                return Interlocked.CompareExchange(ref _value, newValue, comparand) == comparand;
            }

            /// <summary>
            /// Atomically set the value to the given updated value
            /// </summary>
            /// <param name="newValue">The new value</param>
            /// <returns>The original value</returns>
            public int AtomicExchange(int newValue)
            {
                return Interlocked.Exchange(ref _value, newValue);
            }

            /// <summary>
            /// Atomically add the given value to the current value and return the sum
            /// </summary>
            /// <param name="delta">The value to be added</param>
            /// <returns>The sum of the current value and the given value</returns>
            public int AtomicAddAndGet(int delta)
            {
                return Interlocked.Add(ref _value, delta);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The incremented value.</returns>
            public int AtomicIncrementAndGet()
            {
                return Interlocked.Increment(ref _value);
            }

            /// <summary>
            /// Atomically increment the current value and return the new value
            /// </summary>
            /// <returns>The decremented value.</returns>
            public int AtomicDecrementAndGet()
            {
                return Interlocked.Decrement(ref _value);
            }

            /// <summary>
            /// Returns the string representation of the current value.
            /// </summary>
            /// <returns>the string representation of the current value.</returns>
            public override string ToString()
            {
                var value = ReadFullFence();
                return value.ToString();
            }
        }

PaddedInteger类,它使用了7个Integer,加上一个对象头,刚好64个字节。

本文来自合作伙伴“doNET跨平台”,了解相关信息可以关注“opendotnet”微信公众号

时间: 2024-09-15 03:43:01

Disruptor-NET和内存栅栏的相关文章

内存屏障

原文地址  作者:Martin Thompson  译者:一粟   校对:无叶,方腾飞 本文我将和大家讨论并发编程中最基础的一项技术:内存屏障或内存栅栏,也就是让一个CPU处理单元中的内存状态对其它处理单元可见的一项技术. CPU使用了很多优化技术来实现一个目标:CPU执行单元的速度要远超主存访问速度.在上一篇文章 "Write Combing (合并写)"中我已经介绍了其中的一项技术.CPU避免内存访问延迟最常见的技术是将指令管道化,然后尽量重排这些管道的执行以最大化利用缓存,从而把

非一致性内存访问的读写锁

原文地址,译文地址,译者: 李杰聪,校对:郑旭东 原文作者: Irina Calciu         Brown University        irina@cs.brown.edu Dave Dice          Oracle Labs             dave.dice@oracle.com Yossi Lev           Oracle Labs             yossi.lev@oracle.com Victor Luchangco    Oracle

内存屏障与JVM并发

内存屏障,又称内存栅栏,是一组处理器指令,用于实现对内存操作的顺序限 制.本文介绍了内存屏障对多线程程序的影响.我们将研究内存屏障与JVM并发机 制的关系,如易变量(volatile).同步(synchronized)和原子条件式 (atomic conditional).本文假定读者已经充分掌握了相关概念和Java内存模 型,不讨论并发互斥.并行机制和原子性.内存屏障用来实现并发编程中称为可 见性(visibility)的同样重要的作用. 内存屏障为何重要? 对主存的一次访问一般花费硬件的数百

全面理解Java内存模型

Java内存模型即Java Memory Model,简称JMM.JMM定义了Java 虚拟机(JVM)在计算机内存(RAM)中的工作方式.JVM是整个计算机虚拟模型,所以JMM是隶属于JVM的. 如果我们要想深入了解Java并发编程,就要先理解好Java内存模型.Java内存模型定义了多线程之间共享变量的可见性以及如何在需要的时候对共享变量进行同步.原始的Java内存模型效率并不是很理想,因此Java1.5版本对其进行了重构,现在的Java8仍沿用了Java1.5的版本. 关于并发编程 在并发

Java内存访问重排序笔记

关于重排序 重排序通常是编译器或运行时环境为了优化程序性能而采取的对指令进行重新排序执行的一种手段. 重排序分为两类:编译期重排序和运行期重排序,分别对应编译时和运行时环境. As-if-serial语义 as-if-serial语义的意思是,所有的动作(Action)都可以为了优化而被重排序,但是必须保证它们重排序后的结果和程序代码本身的应有结果是一致的. Java编译器.运行时和处理器都会保证单线程下的as-if-serial语义. 比如,为了保证这一语义,重排序不会发生在有数据依赖的操作之

软件事务内存导论(二)软件事务内存

1.1    软件事务内存 将实体与状态分离的做法有助于STM(软件事务内存)解决与同步相关的两大主要问题:跨越内存栅栏和避免竞争条件.让我们先来看一下在Clojure上下文中的STM是什么样子,然后再在Java里面使用它. 通过将对内存的访问封装在事务(transactions)中,Clojure消除了内存同步过程中我们易犯的那些错误(见 <Programming Clojure>[Hal09]和<The Joy of Clojure>[FH11]).Clojure会敏锐地观察和

软件事务内存导论(三)用Akka/Multiverse STM实现并发

用Akka/Multiverse STM实现并发 上面我们已经学习了如何在Clojure里使用STM,我猜你现在一定很好奇如何在Java代码中使用STM.而对于这一需求,我们有如下选择: 直接在Java中使用Clojure STM.方法非常简单,我们只需将事务的代码封装在一个Callable接口的实现中就行了,详情请参见第7章. 喜欢用注解(annotation)的开发者可能会更倾向于使用Multiverse的STM API. 除了STM之外,如果我们计划使用角色(actor),那么还可以考虑选

软件事务内存导论(十一)-STM的局限性

1.1    STM的局限性 STM消除了显式的同步操作,所以我们在写代码时就无需担心自己是否忘了进行同步或是否在错误的层级上进行了同步.然而STM本身也存在一些问题,比如在跨越内存栅栏失败或遭遇竞争条件时我们捕获不到任何有用的信息.我似乎可以听到你内心深处那个精明的程序员在抱怨"怎么会这样啊?".确实,STM是有其局限性的,否则本书写到这里就应该结束了.STM只适用于写冲突非常少的应用场景,如果你的应用程序存在很多写操作竞争,那么我们就需要在STM之外寻找解决方案了. 下面让我们进一

并发数据结构:谈谈volatile变量

由来 在CLR 2.0 Memory Model中,我们知道现代CPU架构从CPU到Memory Controller每一级都有速度,容量 不同的高速缓存.之所以这样设计,主要是因为性能.为了进一步提升性能,当线程读取内存中所期望的 元素值时,CPU并不是只读取我们所期望的元素值,它实际上会同时读取该值周围的若干字节,并将其放 入高速缓存中.这是因为应用程序通常读取的字节在内存中彼此相邻.当应用程序又读取该值周围的字节 时,这些字节已经在高速缓存中了,这样就避免了应用程序再次访问内存,也提升了性