Akka Essentials - 1

参考Akka Essentials

 

1 Introduction to Akka

Actor Model

Actor模式的由来 
In 1973, Carl Hewitt, Peter Bishop, and Richard Steiger wrote a paper—A Universal Modular ACTOR Formalism for Artificial Intelligence, which introduced the concept of Actors. Subsequently, the Actor Model was implemented in the Erlang language by Joe Armstrong and Ericsson implemented the AXD 301 telecom switch that went onto achieve reliability of 99.9999999 percent (nine 9's).

Actor模式改变传统oo的并发策略, 如下图 
Actors change their state only when they receive a stimulus in the form of a message. 
So unlike the object-oriented world where the objects are executed sequentially, the actors execute concurrently.

 

The Actor Model is based on the following principles: 
• The immutable messages are used to communicate between actors. 
Actors do not share state, and if any information is shared, it is done via message only. 
Actors control the access to the state and nobody else can access the state. This means there is no shared, mutable state. 
不可变消息, actor不会share自己的state, 只会通过不可变消息进行通信

• Each actor has a queue attached where the incoming messages are enqueued. 
Messages are picked from the queue and processed by the actor, one at a time. An actor can respond to the received message by sending immutable messages to other actors, creating a new set of actors, updating their own state, or designating the computational logic to be used when the next message arrives (behavior change). 
每个actor会有一个queue来接收message, actor会依次处理收到的message, 并发送消息给其他的actors

• Messages are passed between actors asynchronously. 
It means that the sender does not wait for the message to be received and can go back to its execution immediately. Any actor can send a message to another actor with no guarantee on the sequence of the message arrival and execution. 
消息是异步发送, actor不会保证消息到达或执行的顺序

• Communication between the sender and receiver is decoupled and asynchronous, allowing them to execute in different threads. 
By having invocation and execution in separate threads coupled with no shared state, allows actors to provide a concurrent and scalable model. 
这种低耦合和异步的方式, 很容易进行concurrent和扩展

 

Akka framework

The Akka framework has taken the Actor Model concept to build an event-driven, middleware framework that allows building concurrent, scalable, distributed systems. Akka uses the Actor Model to raise the abstraction level that decouples the business logic from low-level constructs of threads, locks, and non-blocking I/O. 

Akka其实就是一种达到工业级别的Actor编程框架, 另外具有如下特性

The Akka framework provides the following features: 
• Concurrency: Akka Actor Model abstracts the concurrency handling and allows the programmer to focus on the business logic. 
• Scalability: Akka Actor Model's asynchronous message passing allows applications to scale up on multicore servers. 
• Fault tolerance: Akka borrows the concepts and techniques from Erlang to build a "Let It Crash" fault-tolerance model using supervisor hierarchies to allow applications to fail fast and recover from the failure as soon as possible. 
• Event-driven architecture: Asynchronous messaging makes Akka a perfect platform for building event-driven architectures. 
• Transaction support: Akka implements transactors that combine actors and software transactional memory (STM) into transactional actors. This allows composition of atomic message flows with automatic retry and rollback. 
• Location transparency: Akka treats remote and local process actors the same, providing a unified programming model for multicore and distributed computing needs. 
• Scala/Java APIs: Akka supports both Java and Scala APIs for building applications.

 

What is an actor?

Actor is modeled as the object that encapsulates state and behavior
All the messages intended for the actors are parked in a queue and actors process the messages from that queue

 

State and Behavior

这个很容易理解, 如下图

actor需要确保state不会丢失, crash, 并可以被concurrency访问 
Akka implements actors as a reactive, event-driven, lightweight thread that shields and protects the actor's state. Actors provide the concurrent access to the state allowing us to write programs without worrying about concurrency and locking issues. 
When the actors fail and are restarted, the actors' state is reinitialized to make sure that the actors behave in a consistent manner with a consistent state.

 

Mailbox

用于存放收到message的地方, Akka提供多种类型的mailbox, 有限, 无限, 优先级

Akka provides multiple mailbox implementations. The mailboxes can be bounded or unbounded. 
Akka provides a priority mailbox where the messages are enqueued based on the assigned priority.

 

Actor lifecycle 
Actor定义preStart和postStop接口用于初始化和善后 
Every actor that is defined and created has an associated lifecycle. 
Akka provides hooks such as preStart that allow the actor's state and behavior to be initialized. 
When the actor is stopped, Akka disables the message queuing for the actor before PostStop is invoked. In the postStop hook, any persistence of the state or clean up of any hold-up resources can be done:

 

Fault tolerance

首先是actor hierarchy, 看看右图以项目管理为例 
分而治之, 每个actor都有自己明确的职责, 不会去干涉和了解别人的工作, 大家通过message或输出物来沟通 
同时PM需要监督和管理项目进行, 如果actor发现自己处理不了的message或error, 需要向上级反映以得到帮助 
并且Akka actor都只会有一个supervisor

Akka的fault-tolerance就是基于actor hierarchy和supervisor model

 

The whole idea is to break down the task into smaller tasks to the point where the task is granular and structured enough to be performed by one actor. 
Each actor knows which kind of message it will process and how he reacts in terms of failure. 
So, if the actor does not know how to handle a particular message or an abnormal runtime behavior, the actor asks its supervisor for help. The recursive actor hierarchy allows the problem to be propagated upwards to the point where it can be handled. 
Remember, every actor in Akka has one and only one supervisor. 
This actor hierarchy forms the basis of the Akka's "Let It Crash" fault-tolerance model. 
Akka's fault-tolerance model is built using the actor hierarchy and supervisor model.

 

Location transparency

Akka通过配置来透明化actor的位置 
Akka uses configuration to indicate whether the actor is running locally or on a remote machine. 
Akka uses the actor hierarchy and combines it with the actor system address to make each actor identifiable and reachable.

 

总结

As we move ahead and delve deep into the constructs provided by the Akka framework, we need to make sure that we keep in mind the following concepts: 
• An actor is a computation unit with state, behavior, and its own mailbox 
• There are two types of actors—untyped and typed 
• Communication between actors can be asynchronous or synchronous 
• Message passing to the actors happens using dispatchers 
• Actors are organized in a hierarchy via the actor system 
• Actors are proxied via ActorRef 
• Supervisor actors are used to build the fault-tolerance mechanism 
• Actor path follows the URL scheme, which enables location transparency 
• STM is used to provide transactional support to multiple actor state updates

 

2 Starting with Akka

这章写的非常好, 非常详细的描述的Akka的环境搭建, 需要可以参考, 这儿就不列了 
并且通过一个例子来告诉你Akka怎么用, 最给力的是给出了Java和Scala两个版本, 估计比一下, 谁都想把java扔到垃圾堆去

 

要解决的问题很简单, 对句子分词, 统计, 最后aggregate的过程

对应到Actor基本就是下面的过程,

看看scala怎么实现的, 很清晰, 一下就对Akka有个全局的概念

 

定义Message类, 为了后面用模式匹配, 加上case

sealed trait MapReduceMessage
case class WordCount(word: String, count: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataMap: Map[String, Int]) extends MapReduceMessage
case class Result extends MapReduceMessage 

 

MapActor.scala

package akka.first.app.mapreduce.actors
import akka.actor.Actor
import akka.actor.ActorRef
import akka.first.app.mapreduce.MapData
import akka.first.app.mapreduce.WordCount
import scala.collection.mutable.ArrayBuffer
class MapActor extends Actor {
    val STOP_WORDS_LIST = List("a", "am", "......", "to") //stopword,列出部分
    val defaultCount: Int = 1
    def receive: Receive = {
        case message: String =>  //匹配string,即原始的句子
            sender ! evaluateExpression(message) //将结果发送给reduceActor
    }
    def evaluateExpression(line: String): MapData = MapData {
        line.split("""\s+""").foldLeft(ArrayBuffer.empty[WordCount]) {//比较奇葩的操作, 类似reduce
            (index, word) =>
                if(!STOP_WORDS_LIST.contains(word.toLowerCase))
                    index += WordCount(word.toLowerCase, 1)
                else
                    index
        }
    }
}

 

ReduceActor.scala

package akka.first.app.mapreduce.actors
import scala.collection.immutable.Map
import akka.actor.Actor
import akka.first.app.mapreduce.MapData
import akka.first.app.mapreduce.ReduceData
import akka.first.app.mapreduce.WordCount
class ReduceActor extends Actor {
    def receive: Receive = {
        case MapData(dataList) =>  //匹配MapData message类
            sender ! reduce(dataList) //发送到aggregator
    }
    def reduce(words: IndexedSeq[WordCount]): ReduceData = ReduceData {
        words.foldLeft(Map.empty[String, Int]) { (index, words) => //又使用foldLeft
            if (index contains words.word)
                index + (words.word -> (index.get(words.word).get + 1))
            else
                index + (words.word -> 1)
        }
    }
}

 

AggregateActor.scala

package akka.first.app.mapreduce.actors
import scala.collection.immutable.Map
import scala.collection.mutable.HashMap
import akka.actor.Actor
import akka.first.app.mapreduce.ReduceData
import akka.first.app.mapreduce.Result
class AggregateActor extends Actor {
    val finalReducedMap = new HashMap[String, Int] //存放统计结果
    def receive: Receive = { //需要处理两种message
        case ReduceData(reduceDataMap) => //ReduceData Message, 需要aggregate
            aggregateInMemoryReduce(reduceDataMap)
        case Result => //结果请求
            sender ! finalReducedMap.toString()
    }
    def aggregateInMemoryReduce(reducedList: Map[String, Int]): Unit = {
        for ((key,value) <- reducedList) {
            if (finalReducedMap contains key)
                finalReducedMap(key) = (value + finalReducedMap.get(key).get)
            else
                finalReducedMap += (key -> value)
        }
    }
}

 

MasterActor.scala

Supervisor负责协调各个actors

package akka.first.app.mapreduce.actors
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.first.app.mapreduce._
import akka.routing.RoundRobinRouter
class MasterActor extends Actor {
    //定义各个actor, 用context.actorOf
    val mapActor = context.actorOf(Props[MapActor].withRouter(
            RoundRobinRouter(nrOfInstances = 5)), name = "map")
    val reduceActor = context.actorOf(Props[ReduceActor].withRouter(
            RoundRobinRouter(nrOfInstances = 5)), name = "reduce")
    val aggregateActor = context.actorOf(Props[AggregateActor], name = "aggregate")
    def receive: Receive = {
        case line: String =>
            mapActor ! line
        case mapData: MapData =>
            reduceActor ! mapData
        case reduceData: ReduceData =>
            aggregateActor ! reduceData
        case Result =>
            aggregateActor forward Result
    }
}

 

MapReduceApplication.scala

整个执行程序

package akka.first.app.mapreduce
import scala.collection.immutable.Map
import scala.collection.mutable.ArrayBuffer
import akka.actor.actorRef2Scala
import akka.actor.ActorSystem
import akka.actor.Props
import akka.dispatch.Await
import akka.first.app.mapreduce.actors.MasterActor
import akka.pattern.ask
import akka.util.duration.intToDurationInt
import akka.util.Timeout

sealed trait MapReduceMessage
case class WordCount(word: String, count: Int) extends MapReduceMessage
case class MapData(dataList: ArrayBuffer[WordCount]) extends MapReduceMessage
case class ReduceData(reduceDataMap: Map[String, Int]) extends MapReduceMessage
case class Result extends MapReduceMessage

object MapReduceApplication extends App {
    val _system = ActorSystem("MapReduceApp")
    val master = _system.actorOf(Props[MasterActor], name = "master")  //创建master actor
    implicit val timeout = Timeout(5 seconds)
    master ! "The quick brown fox tried to jump over the lazy dog and fell on the dog"
    master ! "Dog is man's best friend"
    master ! "Dog and Fox belong to the same family"
    Thread.sleep(500)
    val future = (master ? Result).mapTo[String]
    val result = Await.result(future, timeout.duration)
    println(result)
    _system.shutdown
}

本文章摘自博客园,原文发布日期:2013-11-15
时间: 2024-08-08 14:10:23

Akka Essentials - 1的相关文章

Akka Essentials - 2

Actors Defining an actor class MyActor extends Actor { def receive = { } } In Scala, the receive block is actually a partial function, which allows the usage of pattern matching syntax. Creating actors Actor with default constructor 使用actorOf创建actor,

Akka书箱推荐

话不多说,推荐下面几本书: 1. Akka Essentials, By Munish K. Gupta, Publisher: Packt Publishing,Release Date: October 2012.非常适合入门级,图文并茂,看完能够明白AKKA的大部分精髓 2. Effective Akka,By Jamie Allen,Publisher: O'Reilly Media,Release Date: August 2013.讲解AKKA最佳实践,适合有一定开发经验的AKKA使

技术书单整理

算法 算法导论 Introduction to Algorithms, Second Edition, by Thomas H. Cormen, Charles E. Leiserson, Ronald L. Rivest and Clifford Stein 算法概论  Algorithms, S. Dasgupta, C. H. Papadimitriou, and U. V. Vazirani Python Algorithms-Mastering Basic Algorithms in

Akka笔记之日志及测试

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 在前两篇笔记中(第一篇,第二篇),我们简单地介绍了一下Actor以及它的消息传递是如何工作的.在本篇中,我们将看下如何解决TeacherActor的日志打印及测试的问题. 简单回顾 前面我们的Actor是这样的: class TeacherActor extends Actor { val quotes = List( "Moderation is for cowards", "Anything

Akka笔记之Actor简介

英文原文链接,译文链接,原文作者:Arun Manivannan ,译者:有孚 写过多线程的人都不会否认,多线程应用的维护是件多么困难和痛苦的事.我说的是维护,这是因为开始的时候还很简单,一旦你看到性能得到提升就会欢呼雀跃.然而,当你发现很难从子任务的错误中恢复或者有些僵尸BUG很难复现再或者你的分析器显示你的线程在写入一个共享状态前大部分时间都浪费在阻塞上面的时候,痛苦降临了. 我刻意没提Java的并发API,以及它里面的集合类使得多线程编程变得多么轻松简单,因为我相信既然你们点进了这篇文章,

Microsoft Security Essentials 的新增功能

下面几节将详细介绍这些功能. Windows 防火墙集成 Windows 防火墙有助于阻止攻击者或恶意软件访问您的电脑.安装 Microsoft Security Essentials 时,安装向导会验证 Windows 防火墙是否已打开.如果您已有意关闭 Windows 防火墙,可以通过清除复选框将其关闭.您可以随时通过控制面板中的"系统和安全"设置更改 Windows 防火墙设置. 网络检查系统 如果网络易受攻击,那么连接到网络时,电脑也容易受到攻击.网络漏洞的研究显示,从初次报告

如何找回丢失的Microsoft Security Essentials右键扫描菜单

  有些用户在安装MSE后,右击文件/文件夹没有"使用Microsoft Security Essentials扫描"的选项,这个问题可能是由安装了修改版的操作系统或使用了其他优化类软件导致注册表键值被篡改,按照下面的方法来进行修复吧! 创建相关注册表键值 打开记事本,复制粘贴以下注册表信息: Windows Registry Editor Version 5.00 [HKEY_CLASSES_ROOT*shellexContextMenuHandlersEPP] @="{0

最锋利的Visual Studio Web开发工具扩展:Web Essentials详解

原文:最锋利的Visual Studio Web开发工具扩展:Web Essentials详解  Web Essentials是目前为止见过的最好用的VS扩展工具了,具体功能请待我一一道来.   首先,从Extension Manager里安装:最新版本是19号发布的2.5版 然后重启你的VS开发环境,就可以使用它提供的方便功能了. Web Essentials对CSS.JavaScript和HTML都提供了很多快捷的功能支持,具体列表如下: CSS 即时预览Live Web Preview每次

AKKA文档(java版)—角色

原文地址  译者:Zhanggc      审校:吴京润 角色 角色模型对编写并发.分布式系统进行了高度抽象.它减轻了开发者必须对互斥锁与线程管理的负担,更容易编写出正确的并发与并行系统.早在1973 年 Carl Hewitt 发表的论文中定义了角色,但一直流行于Erlang 语言中,随后被爱立信公司应用于建立高并发.可靠通信系统,取得了巨大成功. Akka 框架里面角色的API 跟Scala 框架里面角色相似,后者一些语法曾经模仿Erlang语言. 创建角色 注意:由于Akka强迫父级监管者