《Spring 5 官方文档》26. JMS(二)

26.4 接收消息

26.4.1 同步接收

虽然 JMS 通常与异步处理相关,但它也可以同步地消费消息。可重载的receive(..)方法提供了这个功能。在同步接收期间,调用线程阻塞,直到接收到消息。这可能是一个危险的操作,因为调用线程可能无限期地被阻塞。receiveTimeout属性指定了接收者等待消息的超时时间。

26.4.2 异步接收 – 消息驱动的 POJOs

Spring 还可以通过使用@JmsListener注解来支持监听注解端点,并提供了一种以编程方式注册端点的开放式基础架构。 这是设置异步接收器的最方便的方法,有关详细信息,请参见第26.6.1节“启用监听端点注解”

类似于 EJB 世界里流行的消息驱动 bean(MDB),消息驱动 POJO(MDP) 作为 JMS 消息的接收器。MDP 的一个约束(请看下面的有关javax.jms.MessageListener类的讨论)是它必须实现javax.jms.MessageListener接口。另外当你的 POJO 将以多线程的方式接收消息时必须确保你的代码是线程安全的。

下面是 MDP 的一个简单实现:

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage) {
			try {
				System.out.println(((TextMessage) message).getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}

}

一旦你实现了MessageListener接口,下面该创建一个消息监听容器了。

请看下面例子是如何定义和配置一个随 Sping 发行的消息侦听容器的(这个例子用DefaultMessageListenerContainer)。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener" />
</bean>

请参阅各种消息监听容器的 Spring javadocs,以了解每个实现所支持功能的完整描述。

26.4.3 SessionAwareMessageListener 接口

SessionAwareMessageListener接口是一个 Spring 专门用来提供类似于 JMS MessageListener的接口,也提供了从接收Message来访问 JMS Session的消息处理方法。

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;

}

如果你希望你的 MDP 可以响应所有接收到的消息(使用onMessage(Message, Session)方法提供的Session)那么你可以选择让你的 MDP 实现这个接口(优先于标准的 JMS MessageListener接口)。所有随 Spring 发行的支持 MDP 的消息监听容器都支持MessageListenerSessionAwareMessageListener接口的实现。要注意的是实现了SessionAwareMessageListener接口的类通过接口与 Spring 有了耦合。是否选择使用它完全取决于开发者或架构师。

请注意SessionAwareMessageListener接口的onMessage(..)方法会抛出JMSException异常。和标准 JMS MessageListener接口相反,当使用SessionAwareMessageListener接口时,客户端代码负责处理所有抛出的异常。

26.4.4 MessageListenerAdapter

MessageListenerAdapter类是 Spring 的异步支持消息类中的最后一个组建:简而言之,它允许您将几乎任何类都暴露为MDP(当然有一些限制)。

请考虑以下接口定义。请注意,虽然该接口既不继承MessageListener,也不继承SessionAwareMessageListener接口,但通过MessageListenerAdapter类依然可以当作一个 MDP 使用。还要注意,各种消息处理方法是如何根据可以接收和处理的各种消息的内容进行强类型匹配的。

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);

}
public class DefaultMessageDelegate implements MessageDelegate {
	// implementation elided for clarity...
}

尤其要注意的是,上述MessageDelegate接口的实现(上述DefaultMessageDelegate类)完全不依赖于 JMS。它是一个真正的 POJO,我们可以通过如下配置把它设置成 MDP。

<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener" />
</bean>

以下是另一个只能接收 JMS TextMessage消息的 MDP 示例。注意消息处理方法是如何实际调用receive(在MessageListenerAdapter中默认的消息处理方法的名字是handleMessage)的,但是它是可配置的(从下面可以看到)。注意receive(..)方法是如何使用强制类型来只接收和处理JMS TextMessage消息的。

public interface TextMessageDelegate {

	void receive(TextMessage message);

}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
	// implementation elided for clarity...
}

辅助的MessageListenerAdapter类配置文件类似如下:

<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

请注意,如果上述messageListener接收到不是TextMessage类型的 JMS 消息,则会抛出IllegalStateException(随之产生的其他异常只被捕获而不处理)。MessageListenerAdapter还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应消息。请看下面的接口及其实现:

public interface ResponsiveTextMessageDelegate {

	// notice the return type...
	String receive(TextMessage message);

}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
	// implementation elided for clarity...
}

如果将上述DefaultResponsiveTextMessageDelegateMessageListenerAdapter联合使用,那么从执行receive(..)方法返回的任何非空值都将(缺省情况下)转换为TextMessage。这个返回的TextMessage将被发送到原来的Message中 JMS Reply-To 属性定义的目的地(如果存在),或者是MessageListenerAdapter设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。

26.4.5 事务中的消息处理

在事务中调用消息监听器只需要重新配置监听容器。

本地资源事务可以通过监听容器上定义的sessionTransacted标志进行简单地激活。 然后,每个消息监听器调用将在激活的 JMS 事务中进行操作,并在监听器执行失败的情况下进行消息回滚。 发送响应消息(通过SessionAwareMessageListener)将成为同一本地事务的一部分,但任何其他资源操作(如数据库访问)将独立运行。 在监听器的实现中通常需要进行重复消息的检测,覆盖数据库处理已经提交但消息处理提交失败的情况。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

对于参与外部管理的事务,你将需要配置一个事务管理器并使用支持外部管理事务的监听容器:通常为DefaultMessageListenerContainer

要配置 XA 事务参与的消息监听容器,您需要配置一个JtaTransactionManager(默认情况下,它将委托给 Java EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory需要具有 XA 能力并且正确地注册到你的 JTA 事务协调器上!(检查你的 Java EE 服务的 JNDI 资源配置。)这允许消息接收以及例如同一事务下的数据库访问(具有统一提交语义,以 XA 事务日志开销为代价)。

<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

然后,你只需要将它添加到我们之前的容器配置中。其余的交给容器处理。

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>

26.5 支持 JCA 消息端点

从 Spring2.5 版本开始,Spring 也提供了基于 JCA MessageListener容器的支持。JmsMessageEndpointManager将根据提供者ResourceAdapter的类名自动地决定ActivationSpec类名。因此,通常它只提供如下例所示的 Spring 的通用JmsActivationSpecConfig

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
	<property name="resourceAdapter" ref="resourceAdapter"/>
	<property name="activationSpecConfig">
		<bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
			<property name="destinationName" value="myQueue"/>
		</bean>
	</property>
	<property name="messageListener" ref="myMessageListener"/>
</bean>

或者,您可以使用给定的ActivationSpec对象设置JmsMessageEndpointManagerActivationSpec对象也可能来自 JNDI 查找(使用<jee:jndi-lookup>)。

<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
	<property name="resourceAdapter" ref="resourceAdapter"/>
	<property name="activationSpec">
		<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
			<property name="destination" value="myQueue"/>
			<property name="destinationType" value="javax.jms.Queue"/>
		</bean>
	</property>
	<property name="messageListener" ref="myMessageListener"/>
</bean>

使用 Spring 的 ResourceAdapterFactoryBean,目标ResourceAdapter可以在本地配置,如以下示例所示。

<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
	<property name="resourceAdapter">
		<bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
			<property name="serverUrl" value="tcp://localhost:61616"/>
		</bean>
	</property>
	<property name="workManager">
		<bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
	</property>
</bean>

指定的WorkManager也可能指向环境特定的线程池 – 通常通过SimpleTaskWorkManagerasyncTaskExecutor属性。如果,你恰好考虑使用多个适配器,为你的所有ResourceAdapter实例定义一个共享线程池。

在某些环境(例如 WebLogic 9或更高版本)中,可以从 JNDI 中获取整个ResourceAdapter对象(使用<jee:jndi-lookup>)。然后,基于Spring 的消息监听器可以与服务器托管的ResourceAdapter进行交互,也可以使用服务内置的WorkManager

有关更多详细信息,请参阅JMSMessageEndpointManagerJmsActivationSpecConfig和“`ResourceAdapterFactoryBean“的 JavaDoc。

Spring 还提供了一个通用的 JCA 消息端点管理器,它不绑定到 JMS :org.springframework.jca.endpoint.GenericMessageEndpointManager。 它允许使用任何消息监听器类型(例如 CCI MessageListener)和任何提供者特定的ActivationSpec对象。从所涉及 JCA 提供者的文档可以找到这个连接器的实际能力,并参考“GenericMessageEndpointManager的 JavaDoc ”来了解 Spring 特有的配置详细信息。

基于 JCA 的消息端点管理器与 EJB 2.1的消息驱动 Bean 很相似;它使用了提供者们约定的相同底层资源。 与 EJB 2.1 MDB 一样,任何被 JCA 提供者支持的消息监听器接口都可以在 Spring 上下文中使用。尽管如此,Spring 仍为 JMS 提供了显式的“方便的”支持,很显然是因为 JMS 是 JCA 端点管理约定中最通用的端点 API。

转载自 并发编程网 - ifeve.com

时间: 2024-10-27 20:27:44

《Spring 5 官方文档》26. JMS(二)的相关文章

《Spring 5官方文档》翻译邀请

公司新的应用已经开始使用Spring 5,所以本月组织大家翻译<Spring 5 官方文档> SINGLE网页版  PDF版本. 如何领取 通过评论领取想要翻译的文章,每次领取一章或一节(根据内容长短),翻译完后再领取其他章节.领取完成之后,建议在一个星期内翻译完成,如果不能完成翻译,也欢迎你邀请其他同学和你一起完成翻译.请谨慎领取,并发网是非盈利组织,没办法去跟进每一篇译文的进展,所以很多文章领取了没有翻译,会导致文章长时间没人翻译. 如何提交? 翻译完成之后请登录到并发编程网后台,点击左上

《Spring 5 官方文档》26. JMS(一)

26.1 介绍 Spring 提供了一个 JMS 的集成框架,简化了 JMS API 的使用,就像 Spring 对 JDBC API 的集成一样. JMS 大致可分为两块功能,即消息的生产与消费.JmsTemplate类用于消息生产和消息的同步接收. 对于类似 Java EE 的消息驱动 Bean 形式的异步接收,Spring 提供了大量用于创建消息驱动 POJOs(MDPs)的消息监听器.Spring 还提供了一种创建消息侦听器的声明式方法. org.springframework.jms.

《Spring 5 官方文档》26. JMS(三)

26.6 注解驱动的监听端点 异步接收消息的最简单的方法是使用注解监听端点的基础架构.简而言之,它允许你暴露托管一个 bean 的方法作为一个 JMS 的监听端点. @Component public class MyService { @JmsListener(destination = "myDestination") public void processOrder(String data) { ... } } 上述示例的想法是,每当javax.jms.Destination &

《Spring 5 官方文档》4. 资源(二)

4.6 资源依赖 如果bean本身将通过某种动态过程来确定和提供资源路径,那么bean可以使用ResourceLoader接口来加载资源. j假设以某种方式加载一个模板,其中需要的特定资源取决于用户的角色. 如果资源是静态的,那么完全消除ResourceLoader接口的使用是有意义的,只需让bean公开它需要的Resource属性,那么它们就会以你所期望的方式被注入. 什么使得它们轻松注入这些属性,是所有应用程序上下文注册和使用一个特殊的JavaBeans PropertyEditor,它可以

《Spring 5 官方文档》16.ORM和数据访问(二)

16.3.4编程式事务划分 开发者可以在应用程序的更高级别上对事务进行标定,而不用考虑低级别的数据访问执行了多少操作.这样不会对业务服务的实现进行限制:只需要定义一个Spring的PlatformTransactionManager即可.当然,PlatformTransactionManager可以从多处获取,但最好是通过setTransactionManager(..)方法以Bean来注入,正如ProductDAO应该由setProductDao(..)方法配置一样.下面的代码显示Spring

《Spring 5官方文档》11集成测试 (二)

11.3 JDBC测试支持 org.springframework.test.jdbc是包含JdbcTestUtils的包,它是一个JDBC相关的工具方法集,意在简化标准数据库测试场景.特别地,JdbcTestUtils提供以下静态工具方法: countRowsInTable(..):统计给定表的行数. countRowsInTableWhere(..):使用提供的where语句进行筛选统计给定表的行数. deleteFromTables(..):删除特定表的全部数据. deleteFromTa

《Spring 5 官方文档》5. 验证、数据绑定和类型转换(二)

5.5 Spring类型转换 Spring 3引入了core.convert包来提供一个一般类型的转换系统.这个系统定义了实现类型转换逻辑的服务提供接口(SPI)以及在运行时执行类型转换的API.在Spring容器内,这个系统可以当作是PropertyEditor的替代选择,用于将外部bean的属性值字符串转换成所需的属性类型.这个公共的API也可以在你的应用程序中任何需要类型转换的地方使用. 5.5.1 Converter SPI 实现类型转换逻辑的SPI是简单并且强类型的: package

《Spring 5 官方文档》18. Web MVC 框架(二)

Spring MVC 3.1中的@RequestMapping方法的新支持类 Spring 3.1分别为@RequestMapping方法引入了一组新的支持类,分别叫做RequestMappingHandlerMapping和RequestMappingHandlerAdapter.它们被推荐使用,甚至需要利用Spring MVC 3.1中的新功能和未来.默认情况下,MVC命名空间和MVC Java配置启用新的支持类,但是如果不使用,则必须显式配置.本节介绍旧支持类和新支持类之间的一些重要区别.

《Spring 5 官方文档》18. Web MVC 框架(五)

自定义WebDataBinder初始化 要通过Spring定制与PropertyEditor的请求参数绑定 WebDataBinder,可以使用@InitBinder控制器中的-annotated @InitBinder方法,@ControllerAdvice类中的方法或提供自定义 WebBindingInitializer.有关更多详细信息,请参阅"使用@ControllerAdvice和@RestControllerAdvice建议控制器"一节. 使用@InitBinder自定义数