讨喜的隔离可变性(三)创建角色

正如前面曾经提到过的那样,虽然我们有很多支持角色的类库可供选择,但是在本书中我们将使用Akka。这是一个基于Scala的类库,该类库拥有非常好的性能和可扩展性、并同时支持角色和STM。此外,该类库还可以被用于多种JVM上的语言中。在本章中,我们将注意力集中在Java和Scala身上。而在下一章,我们将会学习如何在其他语言中使用Akka的角色。

 

图 8‑2 某个角色的生存周期

由于Akka是用Scala实现的,所以在Scala中创建和使用角色非常简单并且更加自然,从Akka API的实现里我们也可以看到Scala简约而不简单的风格闪耀其中。除此之外,Akka的开发者们还设计了一套相当出色的传统Java API,可以使我们在Java代码中很方便地创建和使用角色。下面我们将先学习如何在Java中使用这套API,然后再体验一下用Scala时将有着怎样的简化和改变。

用Java创建角色

在Akka中,抽象类akka.actor.UntypedActor用于表示一个角色的抽象表示,而具体的角色定义则只需简单继承这个抽象类并实现其onReceive()函数就可以了——每当有消息到达此角色时该函数将被调用。下面让我们通过一个简单的实例来对上述过程建立一个直观感受。下面我们将会创建一个角色(actor)…不如就写一个可以对扮演不同荧幕人物(role)的请求进行响应的HollywoodActor咋样?

1 <br />
2 public class HollywoodActor extends UntypedActor {<br />
3 public void onReceive(final Object role) {<br />
4 System.out.println(&quot;Playing &quot; + role +<br />
5 &quot; from Thread &quot; + Thread.currentThread().getName());<br />
6 }<br />
7 }<br />

如上所示,onReceive()函数接受一个Object对象作为其参数。在本例中,我们只是简单地将该参数以及负责处理消息的线程的详情打印出来。稍后我们将会学习如何处理不同类型的消息。

在完成了角色(actor)的定义之后,我们还需要创建一个角色的实例,并将该角色(actor)曾经演过的荧幕人物(role)以消息的形式发送给它,下面让我们来实现这部分内容:

01 <br />
02 public class UseHollywoodActor {<br />
03 public static void main(final String[] args) throws InterruptedException {<br />
04 final ActorRef johnnyDepp = Actors.actorOf(HollywoodActor.class).start();<br />
05 johnnyDepp.sendOneWay(&quot;Jack Sparrow&quot;);<br />
06 Thread.sleep(100);<br />
07 johnnyDepp.sendOneWay(&quot;Edward Scissorhands&quot;);<br />
08 Thread.sleep(100);<br />
09 johnnyDepp.sendOneWay(&quot;Willy Wonka&quot;);<br />
10 Actors.registry().shutdownAll();<br />
11 }<br />
12 }<br />

在Java中我们通常都是用new来创建对象的,但由于Akka的角色并非简单对象而是活动对象(active objects),所以我们需要用一个特殊函数actorOf()来完成创建动作。此外,我们还可以先用new生成一个实例,然后再调用actorOf()对该实例进行封装以获得一个角色的引用,关于这种创建方式我们稍后会再研究具体细节。当我们创建好了角色之后,就可以通过调用其start()函数来启动该角色。而当我们启动一个角色时,Akka会将其写入一个注册表(registry)中,于是在这个角色停止运行之前我们都可以通过注册表来访问它。在本例中,johnnyDeep即为角色实例的引用,其类型为ActorRef。

接下来,我们通过sendOneWay()函数向johnnyDeep发送了一些附带着我们希望其扮演的荧幕人物(role)的消息。当消息发出之后,其实我们本不用加入那几个100毫秒等待时间的,但插入延时将有助于我们更好地学习角色如何进行线程切换的运作细节。在代码的结尾处,我们关闭了所有运行中的角色。除了代码示例中所使用的shutdownAll()之外,我们还可以逐个调用每个角色的stop()函数或给所有角色发送kill消息的方式来达到关停所有角色的目的。

为了能够运行上面的实例,我们需要先把Akka的库文件都添加到classpath中,然后通过javac对代码进行编译。编译完成之后,我们就可以像运行其他常规Java程序一样运行本节的示例程序。需要再次提醒你的是,请务必记得将所有相关的JAR都添加到classpath中。下面就是我在我的系统上所使用的编译和运行指令:

1 <br />
2 javac -d . -classpath $AKKA_JARS HollywoodActor.java UseHollywoodActor.java<br />
3 java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />

其中AKKA_JARS的定义如下所示:

1 <br />
2 export AKKA_JARS=&quot;$AKKA_HOME/lib/scala-library.jar:\<br />
3 $AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\<br />
4 $AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\<br />
5 $AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\<br />
6 $AKKA_HOME/lib/akka/akka-typed-actor-1.1.3.jar:\<br />
7 $AKKA_HOME/lib/akka/aspectwerkz-2.2.3.jar:\<br />
8 $AKKA_HOME/config:\<br />
9 .&quot;<br />

为了使实例代码能否顺利地编译运行,请根据你所使用的操作系统来定义AKKA_JARS环境变量,以便编译器能够正确定位到Scala和Akka的安装路径。其中,scala-library.jar是scala相关的功能集合,而我们既可以使用Akka自带的jar,也可以使用Scala安装路径下的那一份。

默认情况下Akka会将额外的日志消息输出到控制台,关于如何对这一行为进行配置请参阅6.8节。

下面让我们编译并运行示例代码,并观察角色对于消息的响应情况:

1 <br />
2 Playing Jack Sparrow from Thread akka:event-driven:dispatcher:global-1<br />
3 Playing Edward Scissorhands from Thread akka:event-driven:dispatcher:global-2<br />
4 Playing Willy Wonka from Thread akka:event-driven:dispatcher:global-3<br />

通过输出结果我们可以看到,示例角色每次只响应一个消息,并且每次运行角色的线程都是不同的。对于消息处理的过程而言,既可以一个线程处理多个消息,也可以像本例这样由不同线程处理不同的消息——但无论是哪种处理模式,在任意时刻都只能有一个消息被处理。该模式的关键点在于,所有角色都是单线程的,但是在陷入等待状态时角色会优雅地将线程释放而不是抓住线程不撒手。我们在发送消息之后插入的sleep语句的目的就是为了将actor引入等待状态以便更清晰地演示这一运作细节。

上例中,我们创建角色时没有带任何构造函参。而如果需要的话,我们可以在角色的创建过程中引入一些参数。例如,我们可以用好莱坞演员的名字来初始化之前的HollywoodActor:

01 <br />
02 public class UseHollywoodActor {<br />
03 public static void main(final String[] args) throws InterruptedException {<br />
04 final ActorRef tomHanks = Actors.actorOf(new UntypedActorFactory() {<br />
05 public UntypedActor create() { return new HollywoodActor(&quot;Hanks&quot;); }<br />
06 }).start();<br />
07 tomHanks.sendOneWay(&quot;James Lovell&quot;);<br />
08 tomHanks.sendOneWay(new StringBuilder(&quot;Politics&quot;));<br />
09 tomHanks.sendOneWay(&quot;Forrest Gump&quot;);<br />
10 Thread.sleep(1000);<br />
11 tomHanks.stop();<br />
12 }<br />
13 }<br />

新版的HollywoodActor类的构造函数定义了一个名为name的String类型参数。而在onReceive()函数中,我们对于不能识别的消息进行了专门的处理,即简单地在屏幕输出该好莱坞演员未曾饰演过那个未识别的消息所代表的荧幕人物(role)。当然我们也可以采取其他动作,比如返回一个错误码、打日志、向上层调用者抛异常等等。下面让我们看看如何将给这个构造函数传递参数:

01 <br />
02 public class HollywoodActor extends UntypedActor {<br />
03 private final String name;<br />
04 public HollywoodActor(final String theName) { name = theName; }<br />
05 public void onReceive(final Object role) {<br />
06 if(role instanceof String)<br />
07 System.out.println(String.format(&quot;%s playing %s&quot;, name, role));<br />
08 else<br />
09 System.out.println(name + &quot; plays no &quot; + role);<br />
10 }<br />
11 }<br />

一般情况下,我们都是通过发送消息而不是直接调用函数的方式与角色进行交互的。Akka不希望我们拿到角色的直接引用,而是希望我们只针对ActorRef的引用进行操作。这样一来,Akka就可以确保我们不会往角色里添加其他函数,并且也不会与角色实例进行直接的交互。直接操纵角色实例的行为会将我们带回到共享可变性的泥淖中,而这正是我们极力想要避免。此外,这种受控的角色创建方式也便于Akka更好地回收废弃的角色。所以如果我们试图直接创建一个角色类的实例,Akka将抛出一个内容为“请不要用’new’操作符显示地创建角色实例”的akka.actor.ActorInitializationException异常。

Akka允许我们以一种受控的方式创建角色实例,即我们可以在一个匿名类中实现UntypedActorFactory接口,并在其create()函数中实现创建角色实例的逻辑。而接下来的actorOf()则把一个继承自UntypedActor的普通对象转换为为一个Akka角色。随后,我们和之前一样向这个actor发送几条消息并观察输出结果。

在本例中,HollywoodActor只接受String类型的消息,但我们在测试用例中向其发送了一条值为Politics、类型为StringBuilder的消息。而我们在onReceive()函数中设计的检查逻辑将会发现并处理这一情况。最后,我们会调用stop()函数来终止角色的运行。代码结尾处插入sleep(1000)的目的是为了让角色在结束之前有机会响应所有未处理的消息。最终的输出结果如下所示:

1 <br />
2 Hanks playing James Lovell<br />
3 Hanks plays no Politics<br />
4 Hanks playing Forrest Gump<br />

用Scala创建角色

在Scala中创建Akka角色时,我们没有像在Java版本中那样继承UntypedActor类,而是要继承Actor trait并实现receive()函数。下面让我们用Scala来实现之前刚刚用Java写过的HollywoodActor类:

1 <br />
2 class HollywoodActor extends Actor {<br />
3 def receive = {<br />
4 case role =&gt;<br />
5 println(&quot;Playing &quot; + role +<br />
6 &quot; from Thread &quot; + Thread.currentThread().getName())<br />
7 }<br />
8 }<br />

在上面的代码中,receive()函数实现了一个PartialFunction并采用了Scala模式匹配的形式,但为了避免分散注意力我们现在先忽略这些细节。当有消息到达时,receive()函数将被调用;如果对Scala语法还不熟悉的话,你可以暂时先把receive()函数想象成一个大的switch语句,其实现的功能与Java版本是完全相同的。

至此我们已经看到了如何定义一个角色,下面让我们把注意力集中到角色的使用上面:

01 <br />
02 object UseHollywoodActor {<br />
03 def main(args : Array[String]) :Unit = {<br />
04 val johnnyDepp = Actor.actorOf[HollywoodActor].start()<br />
05 johnnyDepp ! &quot;Jack Sparrow&quot;<br />
06 Thread.sleep(100)<br />
07 johnnyDepp ! &quot;Edward Scissorhands&quot;<br />
08 Thread.sleep(100)<br />
09 johnnyDepp ! &quot;Willy Wonka&quot;<br />
10 Actors.registry.shutdownAll<br />
11 }<br />
12 }<br />

Actor类的actorOf()函数有多个重载定义,这里我们所采用的是接受一个角色类名(即代码中的 [HollywoodActor])作为其参数的版本。在角色被创建出来之后,我们随即通过调用start()函数将其启动。在本例中,ActorRef类型的变量johnnyDepp即为我们所创建的角色实例的引用。由于Scala可以进行类型推断,所以我们可以不必在代码中明确指定johnnyDepp的类型。

接下来,我们给johnnyDepp发送了3个附带着我们希望其扮演的荧幕人物的消息。噢,稍等一下,这里有一个细节请你注意,即我们是通过特殊函数!来发送消息的。当你见到actor!message时,请从右向左阅读这个语句,就能明白这条语句的意思是把消息发送给指定的角色。这处细节再次展现了Scala在语法方面的简洁与优雅。通过这种方式,我们就无需再将发送消息的语句写成actor.!(message),而是简单地将句点和括号拿掉,简写成actor!message就行了。如果我们更喜欢Java里发送消息的那个函数,那么我们也可以把Scala简洁的语法用在Java风格的函数上,即把语句写成actor sendOneWay message。上面示例中余下的代码与之前Java版本的示例完全相同,这里就不再赘述。

下面我们将通过scalac编译器对上述代码进行编译,但首先请务必记住要把Akka库文件添加到classpath中。编译完成后,我们就可以像之前运行普通Java程序那样运行上面的scala示例程序。需要再次提醒你的是,请务必记得将所需的JARs加入到你系统的classpath中。下面是我在我的系统上所使用的编译和运行指令,请你根据你系统中Scala和Akka的安装目录来自行调整classpath中相关的路径信息:

1 <br />
2 scalac -classpath $AKKA_JARS HollywoodActor.scala UseHollywoodActor.scala<br />
3 java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />

如果我们想要禁止日志消息输出到控制台的话,请参阅6.8节中的相关内容。在将上述示例代码编译并运行之后,我们可以看到其输出结果与之前的Java版本是非常相似的:

1 <br />
2 class HollywoodActor(val name : String) extends Actor {<br />
3 def receive = {<br />
4 case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
5 case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
6 }<br />
7 }<br />

如果想在创建角色时传些参数给它,如好莱坞演员的名字等,你会发现用Scala来实现会比之前的Java版本简单很多。下面让我们先对HollywoodActor类进行改造,以使其可以接受构造函参:

1 <br />
2 class HollywoodActor(val name : String) extends Actor {<br />
3 def receive = {<br />
4 case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
5 case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
6 }<br />
7 }<br />

如上所示,新版本的HollywoodActor类接受一个名为name的String类型的构造函参。而在receive()函数中,我们对于格式无法识别的消息做了专门的处理。在Scala中我们无需再使用instanceof,receive()函数中的case语句即可实现消息与各种模式之间的匹配——在本例中特指消息类型的匹配。

我们用Java创建接受一个构造函参的角色时还是花了不少力气的,但在Scala中一切变得如此简单:

01 <br />
02 object UseHollywoodActor {<br />
03 def main(args : Array[String]) : Unit = {<br />
04 val tomHanks = Actor.actorOf(new HollywoodActor(&quot;Hanks&quot;)).start()<br />
05 tomHanks ! &quot;James Lovell&quot;<br />
06 tomHanks ! new StringBuilder(&quot;Politics&quot;)<br />
07 tomHanks ! &quot;Forrest Gump&quot;<br />
08 Thread.sleep(1000)<br />
09 tomHanks.stop()<br />
10 }<br />
11 }<br />

在上面的代码中,我们先用new关键字对角色进行初始化,随后又将实例化好的对象传给actorOf()函数(这是由于Akka禁止在actorOf()函数之外随意地创建actor实例)。通过这一动作,我们就将一个继承自Actor的普通对象转换成了一个Akka角色。接下来,我们同样会给新创建的角色发送3条消息。剩下的代码与Java版本非常相似,这里就不再赘述。最后让我们运行上述示例代码,并确认其输出与Java版本是否相同:

1 <br />
2 Hanks playing James Lovell<br />
3 Hanks plays no Politics<br />
4 Hanks playing Forrest Gump<br />
时间: 2024-08-31 18:40:25

讨喜的隔离可变性(三)创建角色的相关文章

讨喜的隔离可变性-前言

曾有个的医嘱是这样说的:"如果它伤到了你,那就别再用它了".在并发编程领域,共享可变性就是那个"它". 虽然JDK的线程API使我们可以非常容易地创建线程,但如何防止线程冲突和逻辑混乱却又成了大问题.STM虽然可以解决部分问题,但是在一些类似Java这样的语言中,我们仍不得不非常小心谨慎地避免非托管可变变量和事务逻辑中产生某些副作用.而令人惊讶的是,当共享可变性消失的时候,所有那些令人纠结的问题也都随之消失了. 事实证明,在相同数据集上起多个线程互相冲突地执行是行不

讨喜的隔离可变性(六)多角色协作

在使用基于角色的编程模型时,只有当多个角色互相协作.同心协力解决问题时,我们才能真正从中获益并感受到其中的乐趣.为了更好地利用并发的威力,我们通常需要把问题拆分成若干个子问题.不同的角色可以负责不同的子问题,而我们则需要对角色之间的通信进行协调.下面我们将通过重写计算目录大小的例子来学习如何在进行多角色协作. 在4.2节中,我们写了一个计算给定目录下所有文件大小的程序.在那个例子中,我们启动了100个线程,每个线程都负责扫描不同的子目录,并在最后异步地将所有计算结果累加在一起.而本节中我们将看到

讨喜的隔离可变性(五)同时使用多个角色

声明:本文是<Java虚拟机并发编程>的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文. 在使用基于角色的编程模型时,只有当多个角色互相协作.同心协力解决问题时,我们才能真正从中获益并感受到其中的乐趣.为了更好地利用并发的威力,我们通常需要把问题拆分成若干个子问题.不同的角色可以负责不同的子问题,而我们则需要对角色之间的通信进行协调.下面我们将通过重写计算目录大小的例子来学习如何在进行多角色协作. 在4.2节中,我们写了一个计算给定目录下所有文件大小的程序.在那个例子中

讨喜的隔离可变性(一)用角色实现隔离可变性

Java将OOP变成了可变性驱动(mutability-driven)的开发模式[1],而函数式编程则着重强调不可变性,而这两种极端的方式其实都是有问题的.如果每样事物都是可变的,那么我们就需要妥善处理可见性和竞争条件.而在一个真实的应用程序中,也并非所有事物都是不可变的.即使是纯函数式语言也提供了代码限制区,在该区域内允许出现带副作用的逻辑以及按顺序执行这些逻辑的方法.但无论我们倾向于哪种编程模型,避免共享可变性都是毋庸置疑的. 共享可变性--并发问题的根源所在--是指多个线程可以同时更改相同

讨喜的隔离可变性(十三)角色的特性

基于角色的并发模型降低了隔离可变性编程的难度,但该模型在适用场景上还是存在一些限制. 由于角色是通过消息来进行彼此间通信的,所以在那些没有强制不可变性的语言中,我们就必须人工来保证消息都是不可变的.传递可变消息将导致线程安全问题并最终使整个应用陷入共享可变性的险境当中,所以当手头的辅助工具还没有发展到可以帮助我们自动查验消息的不可变性之前,保证消息不可变性的重担暂时还是得由我们程序员来肩负. 角色都是各自异步运行的,彼此之前可以通过传递消息来进行协作.但某些角色的意外失败有可能导致其他角色饿死-

讨喜的隔离可变性(八)类型化角色和Murmurs

使用了类型化角色的EnergySource使我们能够以调用函数的形式来掩盖后台顺序处理异步消息的过程,在实现了线程安全的同时又可以免去显式同步的困扰.虽然创建类型化角色并不困难,但此时我们的EnergySource却还是一个丢失了关键特性的半成品--即还没有可以周期性自动补充电量的能力. 在上一章我们所实现的版本中,由于整个动作都是在后台完成,所以电量补充的动作是不需要任何用户介入的.只要我们启动了电源,就会有一个专门的timer负责每秒钟为电源增加一格电量. 然而在使用了类型化角色的版本中,实

讨喜的隔离可变性(九)混合使用角色和STM

角色可以帮助我们对可变状态进行很好地隔离.尤其是当问题能够被拆分成可以独立运行的多个并发任务.并且并发任务彼此之间都是通过消息进行异步通信时,角色的表现更佳.但是,角色并未提供对跨任务的一致性进行管理的方法.所以如果我们希望两个或多个角色的动作要么全部成功.要么全部失败,则角色就无法独立实现,而此时我们就需要通过引入STM来与角色配合完成此类功能.在本节中,我假定你已经阅读过第6章.以及本章中有关角色和类型化角色的相关内容. 我们在4.6节和6.9节中曾经实现过一个用于在两个Account之间转

讨喜的隔离可变性(二)角色的特性

角色是一种能够接收消息.处理请求以及发送响应的自由运行的活动(activity),主要被设计用来支持异步化且高效的消息传递机制. 每个角色都有一个内建的消息队列,该队列与手机上所使用的短信队列十分相似.假设Sally和Sean同时给Bob的手机发了短信,则运营商将会把这两条短信都保存起来以便Bob在方便的时候取走.类似地,基于角色的并发库允许多个角色并发地发送消息.默认情况下,消息发送者都是非阻塞的:它们会先把消息发送出去,然后再继续处理自己的业务逻辑.类库一般会让特定的角色顺序地拾取并处理消息

讨喜的隔离可变性(七)使用类型化角色

到目前为止我们所接触过的角色都是可以接收消息的,而消息的类型也是五花八门,如String.元组.case类/自定义消息等.然而发送消息的行为在感觉上与我们日常编程工作中所使用的常规函数调用还是有很大区别的,为了弥合二者之间的鸿沟,类型化角色(Typed Actor)就应运而生了.这种类型的角色可以将发送消息的动作在形式上伪装成常规的函数调用,而将消息传输动作隐藏在后台执行.我们可以将类型化角色想像成为一个活动的对象,该对象运行在一个属于自己的轻量消息驱动的线程里面,并且还带有一个用于将正常的函数