Akka并发编程——第四节:Actor模型(三)

本将主要内容:
1. Actor引用、Actor路径

1. Actor引用、Actor路径

下图是Akka官方文档中给出的一张图

该图清晰地说明了ActorPath,ActorRef,Actor及ActorSystem之间的关系,并说明了Actor整体的层次结构。前面我们提到,Akka应用程序会持有一个名称为user的Actor,该Actor被称为guardian supervisor(守卫监督器),无论是ActorSystem创建的Actor还是通过ActorContext创建的Actor都为user的子类,它是最顶级的Actor。

(一)ActorRef

对于ActorRef,我们已经很熟悉了,通过调用ActorSystem.actorOf方法可以创建Actor,返回的便是ActorRef,例如代码

 //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

返回的便是FirstActor的ActorRef对象,ActorRef最重要的作用便是向Actor发送消息,例如

//向myactor发送消息
myactor!"test"
myactor! 123

另外,还可以通过context隐式对象获取父Actor和子Actor的ActorRef,示例代码如下:

/*
*Actor API:成员变量self及sender()方法的使用
*/
object Example_07 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    var parentActorRef:ActorRef=_
    override def preStart(): Unit ={
      //通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent
    }
    def receive = {
      case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  //获取ActorPath
  val myActorPath=system.child("firstActor")
  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)
  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor1!"test"
  myActor1! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果

[INFO] [04/02/2016 20:28:08.941] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 20:28:08.942] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from ParentActorRef
[INFO] [04/02/2016 20:28:08.943] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

代码

 class MyActor extends Actor with ActorLogging{
    var parentActorRef:ActorRef=_
    override def preStart(): Unit ={
      //通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent
    }
    def receive = {
      case "test" => log.info("received test");parentActorRef!"message from ParentActorRef"
      case _      => log.info("received unknown message");
    }

  }

中,使用

//通过context.parent获取其父Actor的ActorRef
      parentActorRef=context.parent

获取MyActor 的直接父Actor的ActorRef,在本例中为FirstActor,因为在FirstActor中,我们使用

 //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")

创建了MyActor,FirstActor便自动成为MyActor的直接Supervisor。如此便可以通过代码

parentActorRef!"message from ParentActorRef"

发送消息。

另外,还可以通过

//创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")
  //获取ActorPath
  val myActorPath=system.child("firstActor")
  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

system.actorSelection(myActorPath)方法获取相应的ActorRef。然后再使用获取到的ActorRef向Acto发送消息

 //向myActor1发送消息
  myActor1!"test"
  myActor1! 123

现在,让我们来总结一下获取ActorRef的方法:
(1)通过ActorSystem的actorOf方法,不过这种方式是通过创建Actor,然后返回其ActorRef
(2)通过context.actorOf方法,这种方式也是通过创建Actor,然后返回其ActorRef
(3)通过context.parent、context.self、context.children方法获取当前Actor的父Actor、当前Actor及子Actor的ActorRef,这种方式是获取已经创建好的Actor的ActorRef
(4)通过val myActor1=system.actorSelection(myActorPath)方法来获取ActorRef,这种方式也是获取已经创建好的Actor的ActorRef

(二)ActorPath

在前面的例子中,我们通过

 val myActorPath=system.child("firstActor")

已经使用到了ActorPath。在Akka中,ActorPath采用统一资源定位符的方式进行组织,例如

//本地ActorPath
"akka://my-sys/user/service-a/worker1"
//远程ActorPath
"akka.tcp://my-sys@host.example.com:5678/user/service-b"

本地ActorPath是Akka并发编程中的ActorPath表示方式,而远程ActorPath是Akka分布式编程中常用的ActorPath表示方式,”akka.tcp://my-sys@host.example.com:5678/user/service-b”中的TCP表示使用的是TCP协议,也可以使用UPD协议,使用UDP协议的远程ActorPath表示方式有如下形式

"akka.udp://my-sys@host.example.com:5678/user/service-b"

ActorPath当中,akka.udp表示的是使用的协议,my-sys表示的是ActorSytem名称,host.example.com:5678为远程机器地址及端口,user为guardian supervisor,service-b为当前应用程序的顶级Actor(通过system.actorOf方法创建的)

/*
*ActorPath
*/
object Example_08 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //获取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("firstActorPath--->{}",firstActorPath)

  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor")

  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor2!"test"
  myActor2! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果:

[INFO] [04/02/2016 21:04:59.612] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor
[INFO] [04/02/2016 21:04:59.612] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 21:04:59.615] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 21:04:59.616] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

本例中的重点代码如下

//获取ActorPath
  val firstActorPath=system.child("firstActor")
  systemLog.info("myActorPath--->{}",firstActorPath)

  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(firstActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor")

通过 val firstActorPath=system.child(“firstActor”)构造一个ActorPath,然后使用

val myActor1=system.actorSelection(firstActorPath)

获取路径下的ActorRef,除这种方式外,还可以通过绝对路径 val myActor2=system.actorSelection(“/user/firstActor”)及相对路径 val myActor3=system.actorSelection(“../firstActor”)的方式获取ActorRef
,需要注意的是绝对路径使用的是guardian supevisor,即/user/firstActor的这种方式。

如果要获取myActor,则基方法是类型的,例如

//获取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)

  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor/myActor")

完整代码如下:

/*
*ActorPath,获取myActor
*/
object Example_09 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    //通过context.actorOf方法创建Actor
    var child:ActorRef = _
    override def preStart(): Unit ={
      log.info("preStart() in FirstActor")
      //通过context上下文创建Actor
      child = context.actorOf(Props[MyActor], name = "myActor")
    }
    def receive = {
      //向MyActor发送消息
      case x => child ! x;log.info("received "+x)
    }
  }

  class MyActor extends Actor with ActorLogging{
    def receive = {
      case "test" => log.info("received test");
      case _      => log.info("received unknown message");
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val firstactor = system.actorOf(Props[FirstActor], name = "firstActor")

  //获取ActorPath
  val myActorPath=system.child("firstActor").child("myActor")
  systemLog.info("firstActorPath--->{}",myActorPath)

  //通过system.actorSelection方法获取ActorRef
  val myActor1=system.actorSelection(myActorPath)

  //直接指定其路径
  val myActor2=system.actorSelection("/user/firstActor/myActor")
  //使用相对路径
  val myActor3=system.actorSelection("../firstActor/myActor")

  systemLog.info("准备向myactor发送消息")
  //向myActor1发送消息
  myActor1!"test"
  myActor1! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码运行结果:

[INFO] [04/02/2016 21:21:14.377] [main] [ActorSystem(MyActorSystem)] firstActorPath--->akka://MyActorSystem/user/firstActor/myActor
[INFO] [04/02/2016 21:21:14.377] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 21:21:14.381] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 21:21:14.382] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

关于远程ActorRef的获取,我们将在后续章节中进行讲述。




Scala学习(公众微信号:ScalaLearning)每天为大家带来一点Scala语言、Spark、Kafka、Flink、AKKA等大数据技术干货及相关技术资讯。技术永无止境,勇攀高峰,一往直前!
觉得文章不错?扫描关注

时间: 2024-11-17 13:02:57

Akka并发编程——第四节:Actor模型(三)的相关文章

Akka并发编程——第八节:Actor模型(七)

本节主要内容 停止运行Typed Actor 当Typed Actor不再需要时要将其停止,有3种方法停止Typed Actor的运行: (1)通过system.shutdown()停止ActorSystem中所有的Typed Actor: (2)调用TypedActor(system).stop(mySquarer)停止指定的Typed Actor: (3)调用TypedActor(system).poisonPill(otherSquarer)停止指定的Typed Actor. 具体使用代码

Akka并发编程——第三节:Actor模型(二)

本节主要内容: Actor API解析 1. Actor API解析 Actor中的主要成员变量和方法定义如下: package akka.actor trait Actor extends scala.AnyRef { type Receive = akka.actor.Actor.Receive //context变量暴露当前Actor的上下文信息及当前消息 implicit val context : akka.actor.ActorContext = { /* compiled code

Akka并发编程——第七节:Actor模型(六)

主要内容: 1. Typed Actor定义 2. Typed Actor创建 3. 消息发送 1. Typed Actor定义 Akka中的Typed Actor是Active Objects设计模式的实现,Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性.Typed Actor由公用的接口和对应实现两部分构成,其后面深层次的实现使用的是代理模式,即通过使用JDK中的动态代理来实现,在调用接口的方法时自动分发到实现接口的对象上.Typed Actor的定

Akka并发编程——第五节:Actor模型(四)

本节主要内容: 1. 停止Actor 1. 停止Actor (1)通过ActorSystem.shutdown方法停止所有 Actor的运行 /* *停止Actor:ActorSystem.shutdown方法 */ object Example_10 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor w

Akka并发编程——第六节:Actor模型(五)

本将主要内容: 1. !消息发送,Fire-and-Forget消息模型 2. ?消息发送,Send-And-Receive-Future消息模型 Akka提供了两种消息模型:fire-and-forget和Send-And-Receive-Future.fire-and-forget是一种单向消息发送模型,指的是异步发送消息,通过异步发送消息且消息发送后可以立即返回,Akka中使用?方法进行fire-and-forget消息发送,如stringActor!"Creating Actors wi

Akka v2.4.11 发布,Actor 模型开发库

Akka v2.4.11 发布了. Akka 是一个用 Scala 编写的库,用于简化编写容错的.高可伸缩性的 Java 和 Scala 的 Actor 模型应用. 改进内容,请点击此处查看.更多详情请关注发行日志和提交记录. 文章转载自 开源中国社区 [http://www.oschina.net]

Akka并发编程——第二节:Actor模型(一)

本节主要内容 定义Actor 创建Actor 1. 定义Actor 通过扩展akka.actor.Actor 特质并实现receive方法来定义Actor,代码示例如下 //通过扩展Actor并实现receive方法来定义Actor class MyActor extends Actor { //获取LoggingAdapter,用于日志输出 val log = Logging(context.system, this) //实现receive方法,定义Actor的行为逻辑,返回的是一个偏函数

Java并发编程示例(四):可控的线程中断_java

在上一节"线程中断"中,我们讲解了如何中断一个正在执行的线程以及为了中断线程,我们必须对Thread动点什么手脚.一般情况下,我们可以使用上一节介绍的中断机制.但是,如果线程实现了一个分配到多个方法中的复杂算法,或者方法调用中有一个递归调用,我们应该使用更好的方式来控制线程的中断.为此,Java提供了InterruptedException异常.当检测到中断请求时,可以抛出此异常,并且在run()方法中捕获. 在本节,我们将使用一个线程查找指定目录及其子目录下文件来演示通过使用Inte

并发编程(四):也谈谈数据库的锁机制

首先声明,本次文章基本上都是从其他人的文章中或者论坛的回复中整理而来.我把我认为的关键点提取出来供自己学习.所有的引用都附在文后,在这里也就不一一表谢了. 第二个声明,我对于Internel DB并没有研究过,所使用的也是简单的写写SQL,截止到现在最多的一个经验也就是SQL的性能调优,具体点就是通过Postgresql的执行计划,来调整优化SQL语句完成在特定场景下的数据库调优.对于锁,由于数据库支持的锁机制已经能够满足平时的开发需要.因为所从事的行业并不是互联网,没有实时性高并发的应用场景,