讨喜的隔离可变性(十)使用Transactor

声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。

Akka transactor或事务角色为我们提供了一种将多个角色的执行过程合并到一个事务中的方法。顾名思义,transactor可以将多个角色对于托管STM Ref对象的更改变成原子操作,即仅当外围事务提交成功之后,对于那些托管对象的变更才能生效,否则所有的变更都会被丢弃。

Transactor提供了三种处理消息的方法:

默认情况下,Transactor会在其自己的事务中处理消息。
实现normally()函数。该函数不属于任何事物,其主要功能是独立地处理我们所选择的消息。
申请让消息被协调处理,即使其作为总控事务的一部分来执行。
总体而言,Transactor为我们提供了将其他角色链接到我们的协调事务里的弹性。此外,transactor还提供了前置和后置于事务的可选函数,以便于我们可以提前为事务做好准备或执行某些后置提交操作。

还是老规矩,我们先用Java创建一个transactor,然后再用Scala实现一遍。

在Java中使用Transactor

为了能够在Java中使用transactor,我们需要继承UntypedTransactor类并实现atomically()函数。除此之外,如果我们想要在事务中包含其他角色,则还需要实现coordinate()函数。下面就让我们用transactor来重新实现账户转账的例子。首先还是从我们将会用到的消息类开始说起。

在新版的示例中,我们定义了一个含有存款金额字段的不可变类Deposit,并将该类作为驱动存款操作执行的消息。

public class Deposit {
public final int amount;
public Deposit(final int theAmount) { amount = theAmount; }
}

接下来,我们还需要定义一个与Deposit结构完全相同的消息类Withdraw:

public class Withdraw {
public final int amount;
public Withdraw(final int theAmount) { amount = theAmount; }
}

对于获取账户余额的请求消息而言,由于本身不需要携带任何数据,所以我们只需定义一个空类即可:

public class FetchBalance {}

与此相对地,针对上述请求消息的响应消息Balance则是一个含有有实际账户余额的不可变类:

public class Balance {
public final int amount;
public Balance(final int theBalance) { amount = theBalance; }
}

最后,我们还需要定义Transfer消息,该消息将会包含转账操作的源账户和目的账户以及待转金额:

public class Transfer {
public final ActorRef from;
public final ActorRef to;
public final int amount;
public Transfer(final ActorRef fromAccount,
final ActorRef toAccount, final int theAmount) {
from = fromAccount;
to = toAccount;
amount = theAmount;
}
}

在本例中,AccountService tranactor将会用到Transfer消息,而Account transactor则会使用我们刚才定义的其他消息。下面让我们先来看一下Account transactor的实现代码:

public class Account extends UntypedTransactor {
private final Ref<Integer> balance = new Ref<Integer>(0);
public void atomically(final Object message) {
if(message instanceof Deposit) {
int amount = ((Deposit)(message)).amount;
if (amount > 0) {
balance.swap(balance.get() + amount);
System.out.println("Received Deposit request " + amount);
Using Transactors • 203
}
}
if(message instanceof Withdraw) {
int amount = ((Withdraw)(message)).amount;
System.out.println("Received Withdraw request " + amount);
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() - amount);
else {
System.out.println("...insufficient funds...");
throw new RuntimeException("Insufficient fund");
}
}
if(message instanceof FetchBalance) {
getContext().replySafe(new Balance(balance.get()));
}
}
}

Account类继承自UntypedTransactor并且实现了atomically()函数。该函数将会运行在一个给定事务的上下文环境中——这里的事务可以是调用方所在的事务,如果没显式给出的话也可能是一个独立的事务。在atomically()函数中,如果接收到的消息类型为Deposit,我们就会把存款的数额与当前余额相加之后保存在STM托管的Ref对象中。如果接收到的消息类型为Withdraw并且当前余额大于取款金额时,我们才会在balance中减去取款金额,否则我们会抛出一个异常。而一旦抛出异常,该行为将会触发外围事务的回滚。最后,如果收到的消息类型为FetchBalance,我们只需把当前账户余额balance的值返回给发送方即可。由于整个函数都是在一个事务中运行的,所以在一个transactor中对于Ref对象进行多次访问是没关系的。而仅当外围事务被提交之后,我们对Ref对象所做的变更才能生效,请记住,示例中所涉及的不可变状态是需要我们人工维护的。

下面我们将实现AccountService transactor,其主要功能就是负责协调目标账户(transactor)上的存款操作和源账户(另一个transactor)上的取款操作,实现代码如下所示:

public class AccountService extends UntypedTransactor {
@Override public Set<SendTo> coordinate(final Object message) {
if(message instanceof Transfer) {
Set<SendTo> coordinations = new java.util.HashSet<SendTo>();
Transfer transfer = (Transfer) message;
coordinations.add(sendTo(transfer.to, new Deposit(transfer.amount)));
204 • Chapter 8. Favoring Isolated Mutability
coordinations.add(sendTo(transfer.from,
new Withdraw(transfer.amount)));
return java.util.Collections.unmodifiableSet(coordinations);
}
return nobody();
}
public void atomically(final Object message) {}
}

由于AccountService的唯一职责就是协调存取款操作,所以在coordinate()函数中,我们需要将合适的消息分别发送给源账户和目标账户。为了实现这一目的,我们需要将角色以及每个角色所对应的消息都聚集在一个集合中。当我们将该集合自coordinate()函数返回给调用方时,AccountService transactor的父类将会把合适的消息发往集合中的每一个transactor。而一旦消息被发出,则其自身的automically()实现将会被调用。但由于这里我们没有额外的事情要做,所以就只写了一个空的atomically()函数。

下面让我们写一些的测试代码来检验上述这些transactor的功能:

public class UseAccountService {
public static void printBalance(
final String accountName, final ActorRef account) {
Balance balance =
(Balance)(account.sendRequestReply(new FetchBalance()));
System.out.println(accountName + " balance is " + balance.amount);
}
public static void main(final String[] args)
throws InterruptedException {
final ActorRef account1 = Actors.actorOf(Account.class).start();
final ActorRef account2 = Actors.actorOf(Account.class).start();
final ActorRef accountService =
Actors.actorOf(AccountService.class).start();
account1.sendOneWay(new Deposit(1000));
account2.sendOneWay(new Deposit(1000));
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let's transfer $20... should succeed");
accountService.sendOneWay(new Transfer(account1, account2, 20));
Using Transactors • 205
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let's transfer $2000... should not succeed");
accountService.sendOneWay(new Transfer(account1, account2, 2000));
Thread.sleep(6000);
printBalance("Account1", account1);
printBalance("Account2", account2);
Actors.registry().shutdownAll();
}
}

就交互和使用方法而言,角色和transactor实际是没什么区别的。如果我们将一个普通的消息(如new Deposit(1000))发送给transactor,则该消息将会自动被包裹到一个事务中。此外,我们也可以通过创建akka.transactor.Coordinated实例并把消息包裹进去(例如,new Coordinated(new Deposit(1000)))的方法来构建我们自己的协调事务。在本例中,由于我们只处理单向消息,所以在执行下一步查询之前我们都会插入一些延时以便使消息处理能够彻底完成。这种做法为协调事务成功执行或失败回滚提供了时间,同时也便于我们从随后的打印函数中观察到事务执行的效果。

只有在相关transactor的消息全部成功处理完之后,协调事务才能提交,其中协调请求的等待时间至多为事务超时时间(可配置)。上述测试代码的输出结果如下所示:

Received Deposit request 1000
Received Deposit request 1000
Account1 balance is 1000
Account2 balance is 1000
Let's transfer $20... should succeed
Received Deposit request 20
Received Withdraw request 20
Account1 balance is 980
Account2 balance is 1020
Let's transfer $2000... should not succeed
Received Withdraw request 2000
...insufficient funds...
Received Deposit request 2000
Account1 balance is 980
Account2 balance is 1020

从输出结果中我们可以看到,前两次转存款操作和第一次转账操作都干脆利落地完成,而第二次转账操作则由于待转金额大于源账户的当前余额而失败。所以虽然转账操作的存款步骤顺利完成(由于存款和取款动作是并发执行的,所以我们从输出结果中所看到的最后一次转账操作的执行步骤可能每次都不一样),但取款步骤却没有成功,从而导致整个转账操作失败并回滚。从最后两条输出结果我们看出,第二次转账的存款步骤所产生的变更被丢弃,两个账户的余额又恢复到第二次转账之前的状态。

在Scala中使用Transactor

为了在Java中使用transactor,我们需要继承UntypedTransactor类并实现其atomically()函数。而如果我们想要在事务中包含其他角色,则还需要实现coordinate()函数。接下来我们会将上面的示例从Java翻译成Scala,首先我们还是从一些消息类的定义开始入手,如下所示,这些消息类用Scala的case类实现起来非常简洁。

case class Deposit(val amount : Int)
case class Withdraw(val amount : Int)
case class FetchBalance()
case class Balance(val amount : Int)
case class Transfer(val from : ActorRef, val to : ActorRef, val amount : Int)
Next let’s translate the Account transactor to Scala. We can use pattern
matching to handle the three messages.
class Account extends Transactor {
val balance = Ref(0)
def atomically = {
case Deposit(amount) =>
if (amount > 0) {
balance.swap(balance.get() + amount)
println("Received Deposit request " + amount)
}
case Withdraw(amount) =>
println("Received Withdraw request " + amount)
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() - amount)
else {
println("...insufficient funds...")
Using Transactors • 207
throw new RuntimeException("Insufficient fund")
}
case FetchBalance =>
self.replySafe(Balance(balance.get()))
}
}

接下来,我们需要把Account transactor翻译成Scala的实现方式,这里我们可以使用模式匹配来处理上面定义的三种消息。

class Account extends Transactor {
val balance = Ref(0)
def atomically = {
case Deposit(amount) =>
if (amount > 0) {
balance.swap(balance.get() + amount)
println("Received Deposit request " + amount)
}
case Withdraw(amount) =>
println("Received Withdraw request " + amount)
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() - amount)
else {
println("...insufficient funds...")
Using Transactors • 207
throw new RuntimeException("Insufficient fund")
}
case FetchBalance =>
self.replySafe(Balance(balance.get()))
}
}

下面让我们一起来翻译AccountService transactor。这里我们仍然将atomically()函数置空,同时我们在coordinate()函数中指定了哪些对象是需要参与到事务执行过程中的。与Java版的代码相比,Scala这边的实现代码在语法上要更加简洁:

class AccountService extends Transactor {
override def coordinate = {
case Transfer(from, to, amount) =>
sendTo(to -> Deposit(amount), from -> Withdraw(amount))
}
def atomically = { case message => }
}

最后,我们用下面的测试代码来检验这些transactor的运行情况:

object UseAccountService {
def printBalance(accountName : String, account : ActorRef) = {
(account !! FetchBalance) match {
case Some(Balance(amount)) =>
println(accountName + " balance is " + amount)
case None =>
println("Error getting balance for " + accountName)
}
}
def main(args : Array[String]) = {
val account1 = Actor.actorOf[Account].start()
val account2 = Actor.actorOf[Account].start()
val accountService = Actor.actorOf[AccountService].start()
account1 ! Deposit(1000)
account2 ! Deposit(1000)
Thread.sleep(1000)
printBalance("Account1", account1)
printBalance("Account2", account2)
208 • Chapter 8. Favoring Isolated Mutability

虽然上述代码只是Java版本对应代码的直译,但在这里我们再次见证了Scala在语法简洁方面的优势。通过观察下面的输出结果我们可以看到,Scala版示例的行为与Java版本是完全相同的。

Received Deposit request 1000
Received Deposit request 1000
Account1 balance is 1000
Account2 balance is 1000
Let's transfer $20... should succeed
Received Deposit request 20
Received Withdraw request 20
Account1 balance is 980
Account2 balance is 1020
Let's transfer $2000... should not succeed
Received Deposit request 2000
Received Withdraw request 2000
...insufficient funds...
Account1 balance is 980
Account2 balance is 1020

通过上面两个示例,我们学习了如何在Java和Scala中实现transactor。Transactor集角色与STM的优点于一身,并支持多个独立运行角色之间的一致性。与STM的使用场景类似,transactor非常适用于那些写冲突非常不频繁的应用程序。理想情况下,如果多个角色需要进行某种形式的投票以作出某项决定,则用transactor来实现将会非常方便。
文章转自 并发编程网-ifeve.com

时间: 2024-10-25 21:46:43

讨喜的隔离可变性(十)使用Transactor的相关文章

讨喜的隔离可变性(十二)基于角色模型的局限性和小结

声明:本文是<Java虚拟机并发编程>的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文. 截至目前我们所写的关于角色的例子中,所有角色及其客户端都运行于同一JVM进程中.但在现实生活中,有一部分开发者认为角色也应该像在Erlang中那样被用于进程间通信.而另一部分开发者则像我们在前面所演示的那样只将其应用于进程内通信.值得说明的一点是,Scala和Akka同时兼顾了这两个阵营的需求. 在Akka中,远程角色的用法与进程内角色的用法十分相似,唯一的区别就在于我们如何访问角

讨喜的隔离可变性-前言

曾有个的医嘱是这样说的:"如果它伤到了你,那就别再用它了".在并发编程领域,共享可变性就是那个"它". 虽然JDK的线程API使我们可以非常容易地创建线程,但如何防止线程冲突和逻辑混乱却又成了大问题.STM虽然可以解决部分问题,但是在一些类似Java这样的语言中,我们仍不得不非常小心谨慎地避免非托管可变变量和事务逻辑中产生某些副作用.而令人惊讶的是,当共享可变性消失的时候,所有那些令人纠结的问题也都随之消失了. 事实证明,在相同数据集上起多个线程互相冲突地执行是行不

讨喜的隔离可变性(一)用角色实现隔离可变性

Java将OOP变成了可变性驱动(mutability-driven)的开发模式[1],而函数式编程则着重强调不可变性,而这两种极端的方式其实都是有问题的.如果每样事物都是可变的,那么我们就需要妥善处理可见性和竞争条件.而在一个真实的应用程序中,也并非所有事物都是不可变的.即使是纯函数式语言也提供了代码限制区,在该区域内允许出现带副作用的逻辑以及按顺序执行这些逻辑的方法.但无论我们倾向于哪种编程模型,避免共享可变性都是毋庸置疑的. 共享可变性--并发问题的根源所在--是指多个线程可以同时更改相同

讨喜的隔离可变性(六)多角色协作

在使用基于角色的编程模型时,只有当多个角色互相协作.同心协力解决问题时,我们才能真正从中获益并感受到其中的乐趣.为了更好地利用并发的威力,我们通常需要把问题拆分成若干个子问题.不同的角色可以负责不同的子问题,而我们则需要对角色之间的通信进行协调.下面我们将通过重写计算目录大小的例子来学习如何在进行多角色协作. 在4.2节中,我们写了一个计算给定目录下所有文件大小的程序.在那个例子中,我们启动了100个线程,每个线程都负责扫描不同的子目录,并在最后异步地将所有计算结果累加在一起.而本节中我们将看到

讨喜的隔离可变性(十三)角色的特性

基于角色的并发模型降低了隔离可变性编程的难度,但该模型在适用场景上还是存在一些限制. 由于角色是通过消息来进行彼此间通信的,所以在那些没有强制不可变性的语言中,我们就必须人工来保证消息都是不可变的.传递可变消息将导致线程安全问题并最终使整个应用陷入共享可变性的险境当中,所以当手头的辅助工具还没有发展到可以帮助我们自动查验消息的不可变性之前,保证消息不可变性的重担暂时还是得由我们程序员来肩负. 角色都是各自异步运行的,彼此之前可以通过传递消息来进行协作.但某些角色的意外失败有可能导致其他角色饿死-

讨喜的隔离可变性(五)同时使用多个角色

声明:本文是<Java虚拟机并发编程>的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文. 在使用基于角色的编程模型时,只有当多个角色互相协作.同心协力解决问题时,我们才能真正从中获益并感受到其中的乐趣.为了更好地利用并发的威力,我们通常需要把问题拆分成若干个子问题.不同的角色可以负责不同的子问题,而我们则需要对角色之间的通信进行协调.下面我们将通过重写计算目录大小的例子来学习如何在进行多角色协作. 在4.2节中,我们写了一个计算给定目录下所有文件大小的程序.在那个例子中

讨喜的隔离可变性(十一)调和类型化角色

正如我们在8.7节中所看到的那样,类型化角色是吸取了面向对象的程序设计和基于角色的程序设计二者的精华所孕育出来的新编程模型.该编程模型集所有我们耳熟能详的方法于一身,既可以方便地使用函数调用,又能享受角色所带来的好处.所以,在一个OO应用程序中,相比起普通的角色,我们可能会更倾向于使用类型化的角色.然而与普通角色相类似的是,类型化角色也是各自独立运行且不提供彼此间事务协调功能的,下面我们将会使用调和类型化角色(coordinating typed actor)来解决这个问题. Akka帮助我们简

讨喜的隔离可变性(八)类型化角色和Murmurs

使用了类型化角色的EnergySource使我们能够以调用函数的形式来掩盖后台顺序处理异步消息的过程,在实现了线程安全的同时又可以免去显式同步的困扰.虽然创建类型化角色并不困难,但此时我们的EnergySource却还是一个丢失了关键特性的半成品--即还没有可以周期性自动补充电量的能力. 在上一章我们所实现的版本中,由于整个动作都是在后台完成,所以电量补充的动作是不需要任何用户介入的.只要我们启动了电源,就会有一个专门的timer负责每秒钟为电源增加一格电量. 然而在使用了类型化角色的版本中,实

讨喜的隔离可变性(三)创建角色

正如前面曾经提到过的那样,虽然我们有很多支持角色的类库可供选择,但是在本书中我们将使用Akka.这是一个基于Scala的类库,该类库拥有非常好的性能和可扩展性.并同时支持角色和STM.此外,该类库还可以被用于多种JVM上的语言中.在本章中,我们将注意力集中在Java和Scala身上.而在下一章,我们将会学习如何在其他语言中使用Akka的角色.   图 8‑2 某个角色的生存周期 由于Akka是用Scala实现的,所以在Scala中创建和使用角色非常简单并且更加自然,从Akka API的实现里我们