[翻译]AKKA笔记 - DEATHWATCH -7

当我们说Actor生命周期的时候,我们能看到Actor能被很多种方式停掉(用ActorSystem.stop或ActorContext.stop或发送一个PoisonPill - 也有一个killgracefulstop)。

无论Actor是怎么死的,有些情况一些系统中的其他actor想要知道。让我们举一个Actor与数据库交互的例子 - 我们叫它RepositoryActor。很明显,会有一些其他actor会向这个RepositoryActor发送消息。这些有“兴趣”的Actor很愿意留个eye或者看(watch)这个actor关闭时的消息。这个在Actor里面叫DeathWatch。这个用来watchunwatch的方法就是ActorContext.watchActorContext.unwatch。如果已经监视了,这些watcher会在Actor关闭时收到一个Terminated消息,并可以很舒服的加到他们的receive功能中。

不像Supervision有一个严格的父子继承关系,任何Actor都可以watch任何ActorSystem中的Actor。

让我们看下。

代码

QUOTEREPOSITORYACTOR

1.我们的QueryRepositoryActor格言查询Actor保存了一个quote的列表并且在收到一个QuoteRepositoryRequest时随机返回一条。

  1. 他记录了收到消息的个数,如果收到超过3个消息,他用PoisonPill把自己杀掉

这没啥神奇的。

package me.rerun.akkanotes.deathwatch

import akka.actor.{PoisonPill, Actor, ActorLogging, actorRef2Scala}
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol._
import scala.util.Random

class QuoteRepositoryActor() extends Actor with ActorLogging {

  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  var repoRequestCount:Int=1

  def receive = {

    case QuoteRepositoryRequest => {

      if (repoRequestCount>3){
        self!PoisonPill
      }
      else {
        //Get a random Quote from the list and construct a response
        val quoteResponse = QuoteRepositoryResponse(quotes(Random.nextInt(quotes.size)))

        log.info(s"QuoteRequest received in QuoteRepositoryActor. Sending response to Teacher Actor $quoteResponse")
        repoRequestCount=repoRequestCount+1
        sender ! quoteResponse
      }

    }

  }

}

TEACHERACTORWATCHER

一样的,TeacherActorWatcher也没啥神奇的,除了他创建了一个QuoteRepositoryActor并且用context.watch观察。

package me.rerun.akkanotes.deathwatch

import akka.actor.{Terminated, Props, Actor, ActorLogging}
import me.rerun.akkanotes.protocols.TeacherProtocol.QuoteRequest
import me.rerun.akkanotes.protocols.QuoteRepositoryProtocol.QuoteRepositoryRequest

class TeacherActorWatcher extends Actor with ActorLogging {

  val quoteRepositoryActor=context.actorOf(Props[QuoteRepositoryActor], "quoteRepositoryActor")
  context.watch(quoteRepositoryActor)

  def receive = {
    case QuoteRequest => {
      quoteRepositoryActor ! QuoteRepositoryRequest
    }
    case Terminated(terminatedActorRef)=>{
      log.error(s"Child Actor {$terminatedActorRef} Terminated")
    }
  }
}

测试CASE

这里会有点意思。我从来没想过这个可以被测试。akka-testkit。我们会分析下这三个测试CASE:

1. 断言如果观察到已经收到Terminated消息

QuoteRepositoryActor应该在收到第四条消息时给测试case发送一条Terminated消息。前三条应该是可以的。

"A QuoteRepositoryActor" must {
    ...
    ...
    ...

    "send back a termination message to the watcher on 4th message" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //Let's watch the Actor

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectTerminated(quoteRepository)  //Expect a Terminated Message
      }
    }

2.如果没有观察(watched/unwatched)到则断言没收到Terminated消息

事实上,我们做这个只是演示下context.unwatch。如果我们移掉testProbe.watch和testProbe.unwatch这行,则测试case会运行的很正常。

    "not send back a termination message on 4th message if not watched" in {
      val quoteRepository=TestActorRef[QuoteRepositoryActor]

      val testProbe=TestProbe()
      testProbe.watch(quoteRepository) //watching

      within (1000 millis) {
        var receivedQuotes = List[String]()
        (1 to 3).foreach(_ => quoteRepository ! QuoteRepositoryRequest)
        receiveWhile() {
          case QuoteRepositoryResponse(quoteString) => {
            receivedQuotes = receivedQuotes :+ quoteString
          }
        }

        testProbe.unwatch(quoteRepository) //not watching anymore
        receivedQuotes.size must be (3)
        println(s"receiveCount ${receivedQuotes.size}")

        //4th message
        quoteRepository!QuoteRepositoryRequest
        testProbe.expectNoMsg() //Not Watching. No Terminated Message
      }
    }

3. 在TeacherActorWatcher中断言收到了Terminated消息

我们订阅了EventStream并通过检查一个特殊的日志消息来断言termination。

   "end back a termination message to the watcher on 4th message to the TeacherActor" in {

      //This just subscribes to the EventFilter for messages. We have asserted all that we need against the QuoteRepositoryActor in the previous testcase
      val teacherActor=TestActorRef[TeacherActorWatcher]

      within (1000 millis) {
        (1 to 3).foreach (_=>teacherActor!QuoteRequest) //this sends a message to the QuoteRepositoryActor

        EventFilter.error (pattern="""Child Actor .* Terminated""", occurrences = 1).intercept{
          teacherActor!QuoteRequest //Send the dangerous 4th message
        }
      }
    }

EventFilter中的pattern属性,没啥奇怪的,需要一个正则表达式。正则pattern=”“”Child Actor .* Terminated”“”用来匹配一条格式是Child Actor {Actor[akka://TestUniversityMessageSystem/user/$$d/quoteRepositoryActor#-1905987636]} Terminated日志信息。

Github

与往常一样,代码在github。看下deathwatch的包。



文章来自微信平台「麦芽面包」(微信扫描二维码关注)。未经允许,禁止转载。

时间: 2024-07-31 05:07:28

[翻译]AKKA笔记 - DEATHWATCH -7的相关文章

[翻译]AKKA笔记 - ACTOR生命周期 - 基本 -5

原文地址:http://rerun.me/2014/10/21/akka-notes-actor-lifecycle-basic/ (请注意这了讨论的生命周期并不包括 preRestart 或者postRestart方法,当我们讨论supervision时候我们会说这个) 基本的Actor生命周期很直观.除了一点小不同,你可以直接拿基本Actor生命周期与Java Servlet生命周期作比较. 像其他常规类一样,我们有一个构造函数. preStart方法会被调用. 这里你可以在postStop

[翻译]AKKA笔记 - CHILD ACTORS与ACTORPATH -6

原文:http://rerun.me/2014/10/21/akka-notes-child-actors-and-path/ Actor是完全的继承结构.你创建的任何Actor肯定都是一个其他Actor的child. 让我们分析下: PATH 我们用ActorSystem.actorof创建一个ActorRef并打印出他的path val actorSystem=ActorSystem("SupervisionActorSystem") val actorRef=actorSyste

翻译:AKKA笔记 - 介绍Actors

任何以前做过多线程的人都不会否认管理多线程程序是困难并且痛苦的. 我说管理是因为它开始很容易而且当你看到性能提升时会很兴奋.但是,当你看到你没法从子线程的错误中恢复 或者 这些僵尸bug很难重现 或者 当用性能剖析器时你发现你的线程在更新一个共享状态时阻塞了很长时间时,那真的很痛苦. 我倾向于不说Java的并发API和集合把并发编程变的更轻松和容易了,因为如果你看到这,你肯定渴望对子线程任务有更多控制或者希望更简单但又不愿意去写一堆的锁和同步代码块,而且希望对这种模式有更高层的抽象. 着这个Ak

翻译:AKKA笔记 - Actor消息 -1(一)

从第一篇Akka笔记的介绍中,我们是从很高的高度去观察Akka工具箱中的Actors.在这篇笔记的第二篇,我们会看一下Actors中的消息部分.而且延续上一次的例子,我们还会使用同样的学生与老师的例子. 在Actor消息的第一部分,我们会建立一个Teacher Actor,而且会使用一个叫StudentSimulatorApp的主程序. 回顾学生-老师模式的细节 现在考虑下StudentSimulatorApp单独发消息给TeacherActor.当我说到StudentSimulatorApp,

Akka笔记之Actor简介

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 写过多线程的人都不会否认,多线程应用的维护是件多么困难和痛苦的事.我说的是维护,这是因为开始的时候还很简单,一旦你看到性能得到提升就会欢呼雀跃.然而,当你发现很难从子任务的错误中恢复或者有些僵尸BUG很难复现再或者你的分析器显示你的线程在写入一个共享状态前大部分时间都浪费在阻塞上面的时候,痛苦降临了. 我刻意没提Java的并发API,以及它里面的集合类使得多线程编程变得多么轻松简单,因为我相信既然你们点进了这篇文章,

Akka笔记之消息传递

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor.在第二篇当中,我们来看一下Actor消息传递的功能.这里还是延用之前使用的那个学生-老师的例子. 在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序. 仔细回顾下学生-老师模型 我们现在只考虑StudentSimulatorApp发送给Tea

Akka笔记之日志及测试

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 在前两篇笔记中(第一篇,第二篇),我们简单地介绍了一下Actor以及它的消息传递是如何工作的.在本篇中,我们将看下如何解决TeacherActor的日志打印及测试的问题. 简单回顾 前面我们的Actor是这样的: class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything

ViewState 剖析(翻译兼笔记)

笔记 ViewState 不是什么? 1. ViewState 不是用来恢复回发的控件的值.这个是通过匹配 form 中该控件的变量名而自动完成的.这个只对 Load 事件加载之前创建的控件有效.2. ViewState 不会自动重新创建任何通过代码动态创建的控件.3. 不是用来保存用户信息的.仅仅保存本页的控件状态,而不能在页面之间传递. ViewState 是什么? ViewState 用来跟踪和保存控件的状态信息.否则这些信息可能会丢失,原因可能是这些值不随着 form 回发,或者根本就不

Akka笔记之请求与响应

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 前面我们讲到了Actor的消息传递,并看到了如何发送一条fire-n-forget消息(也就是说,消息发送给Actor后我们就不管了,不从Actor那接收响应). 技术上来讲,消息发送给Actor就是希望能有副作用的.设计上便是如此.目标Actor可以不做响应,也可以做如下两件事情-- 1. 给发送方回复一条响应(在本例中,TeacherActor会将一句名言回复给StudentActor) 2. 将响应转发给其它的