Akka并发编程——第三节:Actor模型(二)

本节主要内容:

  1. Actor API解析

1. Actor API解析

Actor中的主要成员变量和方法定义如下:

package akka.actor
trait Actor extends scala.AnyRef {
  type Receive = akka.actor.Actor.Receive

  //context变量暴露当前Actor的上下文信息及当前消息
  implicit val context : akka.actor.ActorContext = { /* compiled code */ }

  //self作为当前ActorRef的引用
  implicit final val self : akka.actor.ActorRef = { /* compiled code */ }

  //当前Actor接收到最后一条消息对应的消息发送者(Actor)
  final def sender() : akka.actor.ActorRef = { /* compiled code */ }

  //receive方法,抽象方法,定义Actor的行为逻辑
  def receive : akka.actor.Actor.Receive

  //内部使用API
  protected[akka] def aroundReceive(receive : akka.actor.Actor.Receive, msg : scala.Any) : scala.Unit = { /* compiled code */ }
  protected[akka] def aroundPreStart() : scala.Unit = { /* compiled code */ }
  protected[akka] def aroundPostStop() : scala.Unit = { /* compiled code */ }
  protected[akka] def aroundPreRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }
  protected[akka] def aroundPostRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }

   //监督策略,用于Actor容错处理
  def supervisorStrategy : akka.actor.SupervisorStrategy = { /* compiled code */ }

  //Hook方法,用于Actor生命周期监控
  @scala.throws[T](classOf[scala.Exception])
  def preStart() : scala.Unit = { /* compiled code */ }
  @scala.throws[T](classOf[scala.Exception])
  def postStop() : scala.Unit = { /* compiled code */ }
  @scala.throws[T](classOf[scala.Exception])
  def preRestart(reason : scala.Throwable, message : scala.Option[scala.Any]) : scala.Unit = { /* compiled code */ }
  @scala.throws[T](classOf[scala.Exception])
  def postRestart(reason : scala.Throwable) : scala.Unit = { /* compiled code */ }

  //发送给Actor的消息,Actor没有定义相应的处理逻辑时,会调用此方法
  def unhandled(message : scala.Any) : scala.Unit = { /* compiled code */ }
}
object Actor extends scala.AnyRef {
  type Receive = scala.PartialFunction[scala.Any, scala.Unit]

  //空的行为逻辑
  @scala.SerialVersionUID(1)
  object emptyBehavior extends scala.AnyRef with akka.actor.Actor.Receive {
    def isDefinedAt(x : scala.Any) : scala.Boolean = { /* compiled code */ }
    def apply(x : scala.Any) : scala.Nothing = { /* compiled code */ }
  }
  //Sender为null
  @scala.SerialVersionUID(1)
  final val noSender : akka.actor.ActorRef = { /* compiled code */ }
}

(1) Hook方法,preStart()、postStop()方法的使用

/*
 *Actor API: Hook方法
 */
  object Example_05 extends App{
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props

    class FirstActor extends Actor with ActorLogging{
      //通过context.actorOf方法创建Actor
      var child:ActorRef = _

      //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通过context上下文创建Actor
        child = context.actorOf(Props[MyActor], name = "myChild")
      }
      def receive = {
        //向MyActor发送消息
        case x => child ! x;log.info("received "+x)
      }

      //Hook方法,postStop(),Actor停止之后调用
      override def postStop(): Unit = {
        log.info("postStop() in FirstActor")
       }
    }

    class MyActor extends Actor with ActorLogging{
      //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in MyActor")
      }
      def receive = {
        case "test" => log.info("received test")
        case _      => log.info("received unknown message")
      }

      //Hook方法,postStop(),Actor停止之后调用
      override def postStop(): Unit = {
        log.info("postStop() in MyActor")
      }
    }

    val system = ActorSystem("MyActorSystem")
    val systemLog=system.log

    //创建FirstActor对象
    val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

    systemLog.info("准备向myactor发送消息")
    //向myactor发送消息
    myactor!"test"
    myactor! 123
    Thread.sleep(5000)
    //关闭ActorSystem,停止程序的运行
    system.shutdown()
  }

代码运行结果:

[INFO] [04/02/2016 17:04:49.607] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 17:04:49.607] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received test
[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] received unknown message
[INFO] [04/02/2016 17:04:54.616] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myChild] postStop() in MyActor
[INFO] [04/02/2016 17:04:54.617] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] postStop() in FirstActor

在代码

 class FirstActor extends Actor with ActorLogging{
      //通过context.actorOf方法创建Actor
      var child:ActorRef = _

      //Hook方法,preStart(),Actor启动之前调用,用于完成初始化工作
      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通过context上下文创建Actor
        child = context.actorOf(Props[MyActor], name = "myChild")
      }
      def receive = {
        //向MyActor发送消息
        case x => child ! x;log.info("received "+x)
      }

      //Hook方法,postStop(),Actor停止之后调用,用于完成初始化工作
      override def postStop(): Unit = {
        log.info("postStop() in FirstActor")
      }
    }

中分别对postStop、preStart方法进行了重写,在preStart方法中通过代码

 child = context.actorOf(Props[MyActor], name = "myChild")

对成员变量child进行初始化,然后在postStop方法中使用

 //通过context上下文停止MyActor的运行
        context.stop(child)

停止MyActor的运行。在使用代码

//创建FirstActor对象
val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

创建FirstActor时,便会调用preStart方法完成MyActor的创建,因此首先会执行FirstActor中的preStart()方法

dispatcher-4] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor

然后在创建MyActor时执行MyActor中定义的preStart方法

[INFO] [04/02/2016 17:04:49.608] [MyActorSystem-akka.actor.default-dispatcher-2] [akka://MyActorSystem/user/firstActor/myChild] preStart() in MyActor

在执行

//关闭ActorSystem,停止程序的运行
    system.shutdown()

FirstActor作为MyActor的Supervisor,会先停止MyActor,再停止自身,因此先调用MyActor的postStop方法,再调用FirstActor的postStop方法。

(2) 成员变量self及成员方法sender方法的使用

整体代码如下:

  /*
 *Actor API:成员变量self及sender()方法的使用
 */
  object Example_05 extends App{
    import akka.actor.Actor
    import akka.actor.ActorSystem
    import akka.actor.Props

    class FirstActor extends Actor with ActorLogging{
      //通过context.actorOf方法创建Actor
      var child:ActorRef = _

      override def preStart(): Unit ={
        log.info("preStart() in FirstActor")
        //通过context上下文创建Actor
        child = context.actorOf(Props[MyActor], name = "myActor")
      }
      def receive = {
        //向MyActor发送消息
        case x => child ! x;log.info("received "+x)
      }

    }

    class MyActor extends Actor with ActorLogging{
      self!"message from self reference"
      def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"
        case "message from self reference"=>log.info("message from self refrence")
        case _      => log.info("received unknown message");
      }

    }

    val system = ActorSystem("MyActorSystem")
    val systemLog=system.log

    //创建FirstActor对象
    val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

    systemLog.info("准备向myactor发送消息")
    //向myactor发送消息
    myactor!"test"
    myactor! 123
    Thread.sleep(5000)
    //关闭ActorSystem,停止程序的运行
    system.shutdown()
  }

运行结果:

[INFO] [04/02/2016 18:40:37.805] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 18:40:37.805] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] preStart() in FirstActor
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received 123
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received test
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] message from self refrence
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received message from MyActor
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

代码:

class MyActor extends Actor with ActorLogging{
      self!"message from self reference"
      def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"
        case "message from self reference"=>log.info("message from self refrence")
        case _      => log.info("received unknown message");
      }

    }

中使用

self!"message from self reference"

向自身发送了一条消息,receive方法通过

        case "message from self reference"=>log.info("message from self refrence")

对这条消息进行处理。receive方法在处理

def receive = {
        case "test" => log.info("received test");sender()!"message from MyActor"

“test”消息时,会调用

sender()!"message from MyActor"

向sender(本例中为FirstActor)发送”message from MyActor”消息,FirstActor使用

 def receive = {
        //向MyActor发送消息
        case x => child ! x;log.info("received "+x)
      }

处理消息时又向MyActor回送该消息,因此最终的输出有两个unknown message,分别对应123和”message from MyActor”

[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message
[INFO] [04/02/2016 18:40:37.806] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor/myActor] received unknown message

(3) unhandled方法的使用

unhandled方法用于处理没有被receive方法处理的消息,下面的代码给出的是当不重写unhandled方法时的代码

/*
*Actor API:unhandled方法
*/
object Example_06 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props
  class FirstActor extends Actor with ActorLogging{
    def receive = {
      //向MyActor发送消息
      case "test" => log.info("received test")
    }

  }
  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码输出:

[INFO] [04/02/2016 19:14:11.992] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 19:14:11.992] [MyActorSystem-akka.actor.default-dispatcher-3] [akka://MyActorSystem/user/firstActor] received test

不难看出,对于

 myactor! 123

发送的这条消息没有被处理,没有任何的处理逻辑。在实际开发过程中,可能会对不能被处理的消息增加一些应对逻辑,此时可以重写unhandled方法,代码如下:

/*
*Actor API:unhandled方法的使用
*/
object Example_06 extends App{
  import akka.actor.Actor
  import akka.actor.ActorSystem
  import akka.actor.Props

  class FirstActor extends Actor with ActorLogging{
    def receive = {
      //向MyActor发送消息
      case "test" => log.info("received test")
    }

    //重写unhandled方法
    override def unhandled(message: Any): Unit = {
      log.info("unhandled message is {}",message)
    }
  }

  val system = ActorSystem("MyActorSystem")
  val systemLog=system.log

  //创建FirstActor对象
  val myactor = system.actorOf(Props[FirstActor], name = "firstActor")

  systemLog.info("准备向myactor发送消息")
  //向myactor发送消息
  myactor!"test"
  myactor! 123
  Thread.sleep(5000)
  //关闭ActorSystem,停止程序的运行
  system.shutdown()
}

代码输出结果:

[INFO] [04/02/2016 19:17:18.458] [main] [ActorSystem(MyActorSystem)] 准备向myactor发送消息
[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] received test
[INFO] [04/02/2016 19:17:18.458] [MyActorSystem-akka.actor.default-dispatcher-4] [akka://MyActorSystem/user/firstActor] unhandled message is 123

其它如preRestart等方法的使用将在Akka容错部分进行讲解。




Scala学习(公众微信号:ScalaLearning)每天为大家带来一点Scala语言、Spark、Kafka、Flink、AKKA等大数据技术干货及相关技术资讯。技术永无止境,勇攀高峰,一往直前!
觉得文章不错?扫描关注

时间: 2025-01-19 19:01:54

Akka并发编程——第三节:Actor模型(二)的相关文章

Akka并发编程——第四节:Actor模型(三)

本将主要内容: 1. Actor引用.Actor路径 1. Actor引用.Actor路径 下图是Akka官方文档中给出的一张图 该图清晰地说明了ActorPath,ActorRef,Actor及ActorSystem之间的关系,并说明了Actor整体的层次结构.前面我们提到,Akka应用程序会持有一个名称为user的Actor,该Actor被称为guardian supervisor(守卫监督器),无论是ActorSystem创建的Actor还是通过ActorContext创建的Actor都为

Akka并发编程——第七节:Actor模型(六)

主要内容: 1. Typed Actor定义 2. Typed Actor创建 3. 消息发送 1. Typed Actor定义 Akka中的Typed Actor是Active Objects设计模式的实现,Active Objects模式将方法的执行和方法的调用进行解耦合,从而为程序引入并发性.Typed Actor由公用的接口和对应实现两部分构成,其后面深层次的实现使用的是代理模式,即通过使用JDK中的动态代理来实现,在调用接口的方法时自动分发到实现接口的对象上.Typed Actor的定

Akka并发编程——第六节:Actor模型(五)

本将主要内容: 1. !消息发送,Fire-and-Forget消息模型 2. ?消息发送,Send-And-Receive-Future消息模型 Akka提供了两种消息模型:fire-and-forget和Send-And-Receive-Future.fire-and-forget是一种单向消息发送模型,指的是异步发送消息,通过异步发送消息且消息发送后可以立即返回,Akka中使用?方法进行fire-and-forget消息发送,如stringActor!"Creating Actors wi

Akka并发编程——第二节:Actor模型(一)

本节主要内容 定义Actor 创建Actor 1. 定义Actor 通过扩展akka.actor.Actor 特质并实现receive方法来定义Actor,代码示例如下 //通过扩展Actor并实现receive方法来定义Actor class MyActor extends Actor { //获取LoggingAdapter,用于日志输出 val log = Logging(context.system, this) //实现receive方法,定义Actor的行为逻辑,返回的是一个偏函数

Akka并发编程——第八节:Actor模型(七)

本节主要内容 停止运行Typed Actor 当Typed Actor不再需要时要将其停止,有3种方法停止Typed Actor的运行: (1)通过system.shutdown()停止ActorSystem中所有的Typed Actor: (2)调用TypedActor(system).stop(mySquarer)停止指定的Typed Actor: (3)调用TypedActor(system).poisonPill(otherSquarer)停止指定的Typed Actor. 具体使用代码

Akka并发编程——第五节:Actor模型(四)

本节主要内容: 1. 停止Actor 1. 停止Actor (1)通过ActorSystem.shutdown方法停止所有 Actor的运行 /* *停止Actor:ActorSystem.shutdown方法 */ object Example_10 extends App{ import akka.actor.Actor import akka.actor.ActorSystem import akka.actor.Props class FirstActor extends Actor w

Akka框架——第一节:并发编程简介

本节主要内容: 1. 重要概念 2. Actor模型 3. Akka架构简介 多核处理器的出现使并发编程(Concurrent Programming)成为开发人员必备的一项技能,许多现代编程语言都致力于解决并发编程问题.并发编程虽然能够提高程序的性能,但传统并发编程的共享内存通信机制对开发人员的编程技能要求很高,需要开发人员通过自身的专业编程技能去避免死锁.互斥等待及竞争条件(Race Condition)等,熟悉Java语言并发编程的读者们对这些问题的理解会比较深刻,这些问题使得并发编程比顺

Scala入门到精通——第二十六节 Scala并发编程基础

作者:摇摆少年梦 视频地址:http://www.xuetuwuyou.com/course/12 本节主要内容 Scala并发编程简介 Scala Actor并发编程模型 react模型 Actor的几种状态 Actor深入使用解析 1. Scala并发编程简介 2003 年,Herb Sutter 在他的文章 "The Free Lunch Is Over" 中揭露了行业中最不可告人的一个小秘密,他明确论证了处理器在速度上的发展已经走到了尽头,并且将由全新的单芯片上的并行 &quo

并发编程模型

原文链接 作者: Jakob Jenkov 译者: 林威建 [weakielin@gmail.com] 并发系统可以采用多种并发编程模型来实现.并发模型指定了系统中的线程如何通过协作来完成分配给它们的作业.不同的并发模型采用不同的方式拆分作业,同时线程间的协作和交互方式也不相同.这篇并发模型教程将会较深入地介绍目前(2015年,本文撰写时间)比较流行的几种并发模型. 并发模型与分布式系统之间的相似性 本文所描述的并发模型类似于分布式系统中使用的很多体系结构.在并发系统中线程之间可以相互通信.在分