26.4 接收消息
26.4.1 同步接收
虽然 JMS 通常与异步处理相关,但它也可以同步地消费消息。可重载的receive(..)
方法提供了这个功能。在同步接收期间,调用线程阻塞,直到接收到消息。这可能是一个危险的操作,因为调用线程可能无限期地被阻塞。receiveTimeout
属性指定了接收者等待消息的超时时间。
26.4.2 异步接收 – 消息驱动的 POJOs
Spring 还可以通过使用
@JmsListener
注解来支持监听注解端点,并提供了一种以编程方式注册端点的开放式基础架构。 这是设置异步接收器的最方便的方法,有关详细信息,请参见第26.6.1节“启用监听端点注解”。
类似于 EJB 世界里流行的消息驱动 bean(MDB),消息驱动 POJO(MDP) 作为 JMS 消息的接收器。MDP 的一个约束(请看下面的有关javax.jms.MessageListener
类的讨论)是它必须实现javax.jms.MessageListener
接口。另外当你的 POJO 将以多线程的方式接收消息时必须确保你的代码是线程安全的。
下面是 MDP 的一个简单实现:
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
System.out.println(((TextMessage) message).getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
一旦你实现了MessageListener
接口,下面该创建一个消息监听容器了。
请看下面例子是如何定义和配置一个随 Sping 发行的消息侦听容器的(这个例子用DefaultMessageListenerContainer
)。
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener" />
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
</bean>
请参阅各种消息监听容器的 Spring javadocs,以了解每个实现所支持功能的完整描述。
26.4.3 SessionAwareMessageListener 接口
SessionAwareMessageListener
接口是一个 Spring 专门用来提供类似于 JMS MessageListener
的接口,也提供了从接收Message
来访问 JMS Session
的消息处理方法。
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
如果你希望你的 MDP 可以响应所有接收到的消息(使用onMessage(Message, Session)
方法提供的Session
)那么你可以选择让你的 MDP 实现这个接口(优先于标准的 JMS MessageListener
接口)。所有随 Spring 发行的支持 MDP 的消息监听容器都支持MessageListener
或SessionAwareMessageListener
接口的实现。要注意的是实现了SessionAwareMessageListener
接口的类通过接口与 Spring 有了耦合。是否选择使用它完全取决于开发者或架构师。
请注意SessionAwareMessageListener
接口的onMessage(..)
方法会抛出JMSException
异常。和标准 JMS MessageListener
接口相反,当使用SessionAwareMessageListener
接口时,客户端代码负责处理所有抛出的异常。
26.4.4 MessageListenerAdapter
MessageListenerAdapter
类是 Spring 的异步支持消息类中的最后一个组建:简而言之,它允许您将几乎任何类都暴露为MDP(当然有一些限制)。
请考虑以下接口定义。请注意,虽然该接口既不继承MessageListener
,也不继承SessionAwareMessageListener
接口,但通过MessageListenerAdapter
类依然可以当作一个 MDP 使用。还要注意,各种消息处理方法是如何根据可以接收和处理的各种消息的内容进行强类型匹配的。
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
public class DefaultMessageDelegate implements MessageDelegate {
// implementation elided for clarity...
}
尤其要注意的是,上述MessageDelegate
接口的实现(上述DefaultMessageDelegate
类)完全不依赖于 JMS。它是一个真正的 POJO,我们可以通过如下配置把它设置成 MDP。
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener" />
</bean>
以下是另一个只能接收 JMS TextMessage
消息的 MDP 示例。注意消息处理方法是如何实际调用receive
(在MessageListenerAdapter
中默认的消息处理方法的名字是handleMessage
)的,但是它是可配置的(从下面可以看到)。注意receive(..)
方法是如何使用强制类型来只接收和处理JMS TextMessage
消息的。
public interface TextMessageDelegate {
void receive(TextMessage message);
}
public class DefaultTextMessageDelegate implements TextMessageDelegate {
// implementation elided for clarity...
}
辅助的MessageListenerAdapter
类配置文件类似如下:
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
请注意,如果上述messageListener
接收到不是TextMessage
类型的 JMS 消息,则会抛出IllegalStateException
(随之产生的其他异常只被捕获而不处理)。MessageListenerAdapter
还有一个功能就是如果处理方法返回一个非空值,它将自动返回一个响应消息。请看下面的接口及其实现:
public interface ResponsiveTextMessageDelegate {
// notice the return type...
String receive(TextMessage message);
}
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
// implementation elided for clarity...
}
如果将上述DefaultResponsiveTextMessageDelegate
与MessageListenerAdapter
联合使用,那么从执行receive(..)
方法返回的任何非空值都将(缺省情况下)转换为TextMessage
。这个返回的TextMessage
将被发送到原来的Message
中 JMS Reply-To 属性定义的目的地(如果存在),或者是MessageListenerAdapter
设置(如果配置了)的缺省目的地;如果没有定义目的地,那么将产生一个InvalidDestinationException
异常(此异常将不会只被捕获而不处理,它将沿着调用堆栈上传)。
26.4.5 事务中的消息处理
在事务中调用消息监听器只需要重新配置监听容器。
本地资源事务可以通过监听容器上定义的sessionTransacted
标志进行简单地激活。 然后,每个消息监听器调用将在激活的 JMS 事务中进行操作,并在监听器执行失败的情况下进行消息回滚。 发送响应消息(通过SessionAwareMessageListener
)将成为同一本地事务的一部分,但任何其他资源操作(如数据库访问)将独立运行。 在监听器的实现中通常需要进行重复消息的检测,覆盖数据库处理已经提交但消息处理提交失败的情况。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
对于参与外部管理的事务,你将需要配置一个事务管理器并使用支持外部管理事务的监听容器:通常为DefaultMessageListenerContainer
。
要配置 XA 事务参与的消息监听容器,您需要配置一个JtaTransactionManager
(默认情况下,它将委托给 Java EE 服务器的事务子系统)。请注意,底层的 JMS ConnectionFactory
需要具有 XA 能力并且正确地注册到你的 JTA 事务协调器上!(检查你的 Java EE 服务的 JNDI 资源配置。)这允许消息接收以及例如同一事务下的数据库访问(具有统一提交语义,以 XA 事务日志开销为代价)。
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
然后,你只需要将它添加到我们之前的容器配置中。其余的交给容器处理。
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>
26.5 支持 JCA 消息端点
从 Spring2.5 版本开始,Spring 也提供了基于 JCA MessageListener
容器的支持。JmsMessageEndpointManager
将根据提供者ResourceAdapter
的类名自动地决定ActivationSpec
类名。因此,通常它只提供如下例所示的 Spring 的通用JmsActivationSpecConfig
。
<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
<property name="resourceAdapter" ref="resourceAdapter"/>
<property name="activationSpecConfig">
<bean class="org.springframework.jms.listener.endpoint.JmsActivationSpecConfig">
<property name="destinationName" value="myQueue"/>
</bean>
</property>
<property name="messageListener" ref="myMessageListener"/>
</bean>
或者,您可以使用给定的ActivationSpec
对象设置JmsMessageEndpointManager
。ActivationSpec
对象也可能来自 JNDI 查找(使用<jee:jndi-lookup>
)。
<bean class="org.springframework.jms.listener.endpoint.JmsMessageEndpointManager">
<property name="resourceAdapter" ref="resourceAdapter"/>
<property name="activationSpec">
<bean class="org.apache.activemq.ra.ActiveMQActivationSpec">
<property name="destination" value="myQueue"/>
<property name="destinationType" value="javax.jms.Queue"/>
</bean>
</property>
<property name="messageListener" ref="myMessageListener"/>
</bean>
使用 Spring 的 ResourceAdapterFactoryBean
,目标ResourceAdapter
可以在本地配置,如以下示例所示。
<bean id="resourceAdapter" class="org.springframework.jca.support.ResourceAdapterFactoryBean">
<property name="resourceAdapter">
<bean class="org.apache.activemq.ra.ActiveMQResourceAdapter">
<property name="serverUrl" value="tcp://localhost:61616"/>
</bean>
</property>
<property name="workManager">
<bean class="org.springframework.jca.work.SimpleTaskWorkManager"/>
</property>
</bean>
指定的WorkManager
也可能指向环境特定的线程池 – 通常通过SimpleTaskWorkManager
的asyncTaskExecutor
属性。如果,你恰好考虑使用多个适配器,为你的所有ResourceAdapter
实例定义一个共享线程池。
在某些环境(例如 WebLogic 9或更高版本)中,可以从 JNDI 中获取整个ResourceAdapter
对象(使用<jee:jndi-lookup>
)。然后,基于Spring 的消息监听器可以与服务器托管的ResourceAdapter
进行交互,也可以使用服务内置的WorkManager
。
有关更多详细信息,请参阅JMSMessageEndpointManager
、JmsActivationSpecConfig
和“`ResourceAdapterFactoryBean“的 JavaDoc。
Spring 还提供了一个通用的 JCA 消息端点管理器,它不绑定到 JMS :org.springframework.jca.endpoint.GenericMessageEndpointManager
。 它允许使用任何消息监听器类型(例如 CCI MessageListener
)和任何提供者特定的ActivationSpec
对象。从所涉及 JCA 提供者的文档可以找到这个连接器的实际能力,并参考“GenericMessageEndpointManager
的 JavaDoc ”来了解 Spring 特有的配置详细信息。
基于 JCA 的消息端点管理器与 EJB 2.1的消息驱动 Bean 很相似;它使用了提供者们约定的相同底层资源。 与 EJB 2.1 MDB 一样,任何被 JCA 提供者支持的消息监听器接口都可以在 Spring 上下文中使用。尽管如此,Spring 仍为 JMS 提供了显式的“方便的”支持,很显然是因为 JMS 是 JCA 端点管理约定中最通用的端点 API。