软件事务内存导论(四)创建事务

创建事务

我们创建事务的目的是为了协调针对多个托管引用的变更。事务将会保证这些变更是原子的,也就是说,所有的托管引用要么全部被提交要么全部被丢弃,所以在事务之外我们将不会看到有任何局部变更(partial changes)出现。此外,我们也可以用创建事务的方式来解决对单个ref先读后写所引发的相关问题。

Akka是用Scala开发出来的,所以如果我们工作中用的是Scala的话,就可以直接幸福地享用Akka简洁明了的API了。对于那些日常工作中不能使用Scala开发的程序员,Akka同样也提供了一组方便的API,以帮助他们通过Java语言来使用该类库的功能。本节我们将会看到如何利用Akka在Java和Scala中创建事务。

首先我们需要选一个适合用事务来解决的例子。我们在第5章中重构的EnergySource类使用了显式的加锁和解锁操作(其最终版本详见5.7节),下面让就我们将这些显式的加锁/解锁操作换用Akka的事务API来实现。

在Java中创建事务

为了将代码逻辑封装到一个事务中,我们需要创建一个Atomic类的实例并将代码放到该类的atomically()函数里。随后,我们可以通过调用Atomic实例的execute()函数来执行事务代码。类似于下面这样:

1 return  new  Atomic<Object>()  {
2     public  Object  atomically()  {
3         //code  to  run  in  a  transaction...
4         return  resultObject;
5     }
6 }.execute();

调用execute()函数的线程将负责执行atomically()函数里的代码。然而如果调用者本身并没有处在一个事务中的话,那么这个调用将会被封装在一个新的事务中。

下面让我们用Akka事务来重新实现EnergySource。首先,让我们将不可变状态封装到可变的Akka托管引用中去。

1 public  class  EnergySource  {
2     private  final  long  MAXLEVEL  =  100;
3     final  Ref<Long>  level  =  new  Ref<Long>(MAXLEVEL);
4     final  Ref<Long>  usageCount  =  new  Ref<Long>(0L);
5     final  Ref<Boolean>  keepRunning  =  new  Ref<Boolean>(true);
6     private  static  final  ScheduledExecutorService  replenishTimer  =
7         Executors.newScheduledThreadPool(10);

在这段变量定义的代码中,level和usageCount都被声明为Akka Ref,并且各自持有一个不可变的Long类型的值。于是在Java中我们就不能更改这些Long类型的值了,但我们仍然可以通过更改托管引用(即实体)使其安全地指向新值。

在EnergySource的上一个版本中,ScheduledExecutorService会周期性地(每秒钟一次)调用replenish()函数直至整个任务结束,这就要求stopEnergySource()必须是同步的。而在这个版本中,我们不用再周期性地调用replenish()函数,而只会在对象实例初始化的时候执行一下调度操作。在每次调用replenish()函数时,我们都会根据keepRunning的值来决定该函数是否应该在1秒之后再次被调度执行。这一变化消除了stopEnergySource()函数和调度器/计时器(timer)之间的耦合。相反地,stopEnergySource()函数现在只依赖于keepRunning这个标志,而该标志可以很容易地通过STM事务来行管理。

在这一版的代码中,由于可以依赖事务来保证安全性,所以我们没必要再对stopEnergySource()函数进行同步了。同时,由于swap()函数本身就是以事务方式执行的,

01 private  EnergySource()  {}
02 private  void  init()  {
03     replenishTimer.schedule(new  Runnable()  {
04         public  void  run()  {
05             replenish();
06             if  (keepRunning.get())  replenishTimer.schedule(
07                 this,  1,  TimeUnit.SECONDS);
08         }
09     },  1,  TimeUnit.SECONDS);
10 }
11 public  static  EnergySource  create()  {
12     final  EnergySource  energySource  =  new  EnergySource();
13     energySource.init();
14     return  energySource;
15 }
16 public  void  stopEnergySource()  {  keepRunning.swap(false);  }

如下所示,返回当前电量和使用次数的方法将会用到托管引用,但也只是需要调用一下get()函数而已。

1 public  long  getUnitsAvailable()  {  return  level.get();  }
2 public  long  getUsageCount()  {  return  usageCount.get();  }

在getUnitsAvailable()函数和getUsageCount()函数中,由于其中的get()函数都是以事务方式运行的,所以无需显式地将它们封装在事务里。

由于我们会在useEnergy()函数中同时修改电量和使用次数,所以useEnergy()函数需要使用一个显式的事务来完成这些操作。在这里,我们需要保证对所有被读取的值的变更都能保持一致性,即确保对这两个字段的变更是原子的。为了实现这一目标,我们将使用Atomic接口,并用atomically()函数将我们的逻辑代码封装到一个事务中。

01 public  boolean  useEnergy(final  long  units)  {
02     return    new  Atomic<Boolean>()  {
03         public  Boolean  atomically()  {
04             long  currentLevel  =  level.get();
05             if(units  >  0  &&  currentLevel  >=  units)  {
06                 level.swap(currentLevel  -  units);
07                 usageCount.swap(usageCount.get()  +  1);
08                 return  true;
09             else  {
10                 return  false;
11             }
12         }
13     }.execute();
14 }

useEnergy()函数的功能是从当前电量中减掉所消耗的电量(即unit——译者注)。为了实现这一目标,我们需要保证所涉及到的get和set操作都在同一个事务中完成,所以我们把所有相关操作都用atomically()函数封装了起来。最后,我们会调用execute()函数来启动事务并顺序执行的所有操作。

除了上述方法之外,我们还需要关注一下负责给电源充电的replenish()函数。由于这个方法也需要使用事务,所以其实现代码同样需要用Atomic进行封装。

01     private  void  replenish()  {
02         new  Atomic()  {
03             public  Object  atomically()  {
04                 long  currentLevel  =  level.get();
05                 if  (currentLevel  <  MAXLEVEL)  level.swap(currentLevel  +  1);
06                 return  null;
07             }
08         }.execute();
09     }
10 }

下面是针对EnergySource类的测试代码。其主要功能是,用多个线程并发地使用电池,每使用一次消耗一格电,每个线程最多会消耗7格电量。

01 public  class  UseEnergySource  {
02     private  static  final  EnergySource  energySource  =  EnergySource.create();
03     public  static  void  main(final  String[]  args)
04         throws  InterruptedException,  ExecutionException  {
05         System.out.println("Energy  level  at  start:  "  +
06         energySource.getUnitsAvailable());
07         List<Callable<Object>>  tasks  =  new  ArrayList<Callable<Object>>();
08         for(int  i  =  0;  i  <  10;  i++)  {
09             tasks.add(new  Callable<Object>()  {
10                 public  Object  call()  {
11                     for(int  j  =  0;  j  <  7;  j++)  energySource.useEnergy(1);
12                     return  null;
13                 }
14             });
15         }
16         final  ExecutorService  service  =  Executors.newFixedThreadPool(10);
17         service.invokeAll(tasks);
18         System.out.println("Energy  level  at  end:  "  +
19         energySource.getUnitsAvailable());
20         System.out.println("Usage:  "  +  energySource.getUsageCount());
21         energySource.stopEnergySource();
22         service.shutdown();
23     }
24 }

上述代码需要把Akka相关的Jar添加到Java的classpath中才能编译通过,所以首先我们需要创建一个标识jar位置的环境变量:

export  AKKA_JARS="$AKKA_HOME/lib/scala-library.jar:\
$AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\
$AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\
$AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\
$AKKA_HOME/config:\
."

Classpath的定义取决于你使用的操作系统以及Akka在你的操作系统中被安装的位置。我们可以用javac编译器来编译代码,并用java命令来负责执行,具体细节如下所示:

javac  -classpath  $AKKA_JARS  -d  .  EnergySource.java  UseEnergySource.java
java  -classpath  $AKKA_JARS  com.agiledeveloper.pcj.UseEnergySource

万事俱备,下面让我们来编译并执行这段代码。通过代码的实现逻辑我们知道,电源初始有100格电量,而我们创建的10个线程将会消耗掉其中的70格电量,所以最后电源应该净剩30格电量。但由于电池电量会每秒回复一格,所以每次运行结果可能会稍有不同,比如最后净剩电量可能是31格而不是30格。

Energy  level  at  start:  100
Energy  level  at  end:  30
Usage:  70

默认情况下,Akka会将额外的日志消息打印到标准输出上。停掉这个默认的输出也很容易,我们只需要在$AKKA_HOME/config目录下创建一个名为logback.xml的文件,并在里面添加这项配置即可。由于这个文件位于classpath中,所以logger会自动找到这个文件、读取其中的配置并停掉消息输出。除此之外,我们还可以在这个配置文件中设置很多其他有用的配置项。详情请见http://logback.qos.ch/manual/configuration.html

正如我们在本例中所看到的那样,Akka是在后台默默地对事务进行管理的,所以请你多花些时间研究一下上述示例代码,并对事务和线程的运作过程多做一些尝试以便加深对这块知识的理解。

在Scala中创建事务

我们之前已经看到了如何在Java中创建事务(并且我假设你已经阅读过那一部分,所以这里我们就不再赘述了),下面我们将会在Scala中用更少的代码来完成同样的功能。我们之所以能兼顾简洁与功能,部分得益于Scala自身简洁的特点,但更多还是由于Akka API使用了闭包/函数值(closures/function values)的缘故。

相比Java的繁冗,我们在Scala中可以通过很简洁的方法来创建事务。我们所需要做的只是调用一下Stm的auomic()函数就行了,如下所示:

1 atomic  {
2     //code  to  run  in  a  transaction....
3     /*  return  */  resultObject
4 }

其中,我们传给atomic()的闭包/函数值仅在当前线程所运行的那个事务内可见。

下面就是使用了Akka事务的Scala版本的EnergySource实现代码:

01 class  EnergySource  private()  {
02     private  val  MAXLEVEL  =  100L
03     val  level  =  Ref(MAXLEVEL)
04     val  usageCount  =  Ref(0L)
05     val  keepRunning  =  Ref(true)
06     private  def  init()  =  {
07         EnergySource.replenishTimer.schedule(new  Runnable()  {
08             def  run()  =  {
09                 replenish
10                 if  (keepRunning.get)  EnergySource.replenishTimer.schedule(
11                     this1,  TimeUnit.SECONDS)
12             }
13         },  1,  TimeUnit.SECONDS)
14     }
15     def  stopEnergySource()  =  keepRunning.swap(false)
16     def  getUnitsAvailable()  =  level.get
17     def  getUsageCount()  =  usageCount.get
18     def  useEnergy(units  :  Long)  =  {
19         atomic  {
20             val  currentLevel  =  level.get
21             if(units  >  0  &&  currentLevel  >=  units)  {
22                 level.swap(currentLevel  -  units)
23                 usageCount.swap(usageCount.get  +  1)
24                 true
25             else  false
26         }
27     }
28     private  def  replenish()  =
29         atomic  {  if(level.get  <  MAXLEVEL)  level.swap(level.get  +  1)  }
30 }
31 object  EnergySource  {
32     val  replenishTimer  =  Executors.newScheduledThreadPool(10)
33     def  create()  =  {
34         val  energySource  =  new  EnergySource
35         energySource.init
36         energySource
37     }
38 }

作为一个完全的面向对象语言,Scala认为静态方法是不适合放在类的定义中的,所以工厂方法create()就被移到其伴生对象里面去了。余下的代码和Java版本非常相近,只是较之更为简洁。同时,由于使用了优雅的atomic()函数,我们就可以抛开Atomic类和execute()函数调用了。

Scala版本的EnergySource的测试用例如下所示。在并发和线程控制的实现方面,我们既可以像Java版本那样采用JDK的ExecutorService来管理线程,也可以使用Scala的角色(actor)[1] 来为每个并发任务分配执行线程。这里我们将采用第二种方式。当任务完成之后,每个任务都会给调用者返回一个响应,而调用者则需要等待所有任务结束之后才能继续执行。

01 object  UseEnergySource  {
02     val  energySource  =  EnergySource.create()
03     def  main(args  :  Array[String])  {
04         println("Energy  level  at  start:  "  +  energySource.getUnitsAvailable())
05         val  caller  =  self
06         for(i  <-  1  to  10)  actor  {
07             for(j  <-  1  to  7)  energySource.useEnergy(1)
08             caller  !  true
09         }
10         for(i  <-  1  to  10)  {  receiveWithin(1000)  {  case  message  =>  }  }
11         println("Energy  level  at  end:  "  +  energySource.getUnitsAvailable())
12         println("Usage:  "  +  energySource.getUsageCount())
13         energySource.stopEnergySource()
14     }
15 }

我们可以采用如下命令来引入Akka相关的Jar并编译运行上述代码,其中环境变量AKKA_JARS与我们在Java示例中的定义相同:

scalac  -classpath  $AKKA_JARS  *.scala
java  -classpath  $AKKA_JARS  com.agiledeveloper.pcj.UseEnergySource

Scala版本代码的输出结果与我们在Java版本中所看到的没什么两样,并同样依赖于电量恢复的节奏,即可能最终剩余电量是31而不是30。

Energy  level  at  start:  100
Energy  level  at  end:  30
Usage:  70


[1]这里提到Scala的角色(actor)仅仅是为了说明有这种方法可供使用。后面我们还将会学习如何使用功能更为强大的Akka actor。

时间: 2024-10-03 00:21:10

软件事务内存导论(四)创建事务的相关文章

软件事务内存导论(五)创建嵌套事务

1.1    创建嵌套事务 在之前的示例中,每个用到事务的方法都是各自在其内部单独创建事务,并且事务所涉及的变动也都是各自独立提交的.但如果我们想要将多个方法里的事务调整成一个统一的原子操作的时候,上述做法就无能为力了,所以我们需要使用嵌套事务来实现这一目标. 通过使用嵌套事务,所有被主控函数调用的那些函数所创建的事务都会默认被整合到主控函数的事务中.除此之外,Akka/Multiverse还提供 了很多其他配置选项,如新隔离事务(new isolated transactions)等.总之,使

软件事务内存导论(七)阻塞事务

阻塞事务--有意识地等待 我们经常会遇到这样一种情况,即某事务T能否成功完成依赖于某个变量是否发生了变化,并且由于这种原因所引起的事务运行失败也可能只是暂时性的.作为对这种暂时性失败的响应,我们可能会返回一个错误码并告诉事务T等待一段时间之后再重试.然而在事务T等待期间,即使其他任务已经更改了事务T所依赖的数据,事务T也没法立即感知到并重试了.为了解决这一问题,Akka为我们提供了一个简单的工具--retry(),该函数可以先将事务进行回滚,并将事务置为阻塞状态直到该事物所依赖的引用对象发生变化

软件事务内存导论(二)软件事务内存

1.1    软件事务内存 将实体与状态分离的做法有助于STM(软件事务内存)解决与同步相关的两大主要问题:跨越内存栅栏和避免竞争条件.让我们先来看一下在Clojure上下文中的STM是什么样子,然后再在Java里面使用它. 通过将对内存的访问封装在事务(transactions)中,Clojure消除了内存同步过程中我们易犯的那些错误(见 <Programming Clojure>[Hal09]和<The Joy of Clojure>[FH11]).Clojure会敏锐地观察和

软件事务内存导论

前言 软件事务内存 用Akka/Multiverse STM实现并发 创建事务 创建嵌套事务 配置Akka事务 阻塞事务 提交和回滚事件 集合与事务 处理写偏斜异常 STM的局限性 文章转自 并发编程网-ifeve.com

软件事务内存导论(三)用Akka/Multiverse STM实现并发

用Akka/Multiverse STM实现并发 上面我们已经学习了如何在Clojure里使用STM,我猜你现在一定很好奇如何在Java代码中使用STM.而对于这一需求,我们有如下选择: 直接在Java中使用Clojure STM.方法非常简单,我们只需将事务的代码封装在一个Callable接口的实现中就行了,详情请参见第7章. 喜欢用注解(annotation)的开发者可能会更倾向于使用Multiverse的STM API. 除了STM之外,如果我们计划使用角色(actor),那么还可以考虑选

软件事务内存导论(十一)-STM的局限性

1.1    STM的局限性 STM消除了显式的同步操作,所以我们在写代码时就无需担心自己是否忘了进行同步或是否在错误的层级上进行了同步.然而STM本身也存在一些问题,比如在跨越内存栅栏失败或遭遇竞争条件时我们捕获不到任何有用的信息.我似乎可以听到你内心深处那个精明的程序员在抱怨"怎么会这样啊?".确实,STM是有其局限性的,否则本书写到这里就应该结束了.STM只适用于写冲突非常少的应用场景,如果你的应用程序存在很多写操作竞争,那么我们就需要在STM之外寻找解决方案了. 下面让我们进一

软件事务内存导论(六)配置Akka事务

配置Akka事务 默认情况下,Akka为其相关的运行参数都设定了默认值,我们可以通过代码或配置文件akka.conf来更改这些默认设置.如果想了解如何指定或修改该配置文件位置的详细信息,请参阅Akka的文档. 针对单个事务,我们可以利用TransactionFactory在程序代码中更改其设置.下面就让我们用这种方式先后在Java和Scala中更改一些设置来为你展示如何实现设置的变更. 在Java中对事务进行配置 01 public  class  CoffeePot  { 02     pri

软件事务内存导论(十)处理写偏斜异常

处理写偏斜异常 在6.6节中,我们曾经简单讨论了写偏斜(write skew)以及Clojure STM是如何解决这个问题的.Akka同样提供了处理写偏斜问题的支持,但是需要我们配置一下才能生效.OK,一听到配置这个词可能让你觉得有些提心吊 胆,但实际操作起来其实起来还是蛮简单的.下面就让我们首先了解一下Akka在不进行任何配置情况下的默认行为. 让我们回顾一下之前曾经见到过的那个多个账户共享同一个联合余额最低限制例子.首先我们创建了一个名为Portfolio的类来保存支票账户余额和 储蓄账户余

软件事务内存导论(九) 集合与事务

集合与事务 在我们努力学习这些示例的过程中,很容易就会忘记我们所要处理的值都必须是不可变的.只有实体才是可变的,而状态值则是不可变的.虽然STM已经为我们减轻了很多负担,但如果想要在维护不可变性的同时还要兼顾性能的话,对我们来说也将是一个非常严峻的挑战. 为了保证不可变性,我们采取的第一个步骤是将单纯用来保存数据的类(value classes)及其内部所有成员字段都置为final(在Scala中是val).然后,我们需要传递地保证我们自己定义的类里面的字段所使用的类也都 是不可变的.可以说,将