JMS 到 JMS 桥接
警告,首先尝试 Camel!
请注意,我们建议您考虑使用 Apache Camel 将 ActiveMQ Classic 桥接到任何消息代理(或实际上是 任何其他技术、协议或中间件),因为这更容易
- 保持灵活性;它很容易将不同的队列/主题映射到另一个提供程序上的一个或多个队列或主题
- 执行基于内容的路由、过滤和其他 企业集成模式
- 允许您使用 任何技术、协议或中间件,而不仅仅是 JMS 提供程序
例如,在您的 Spring XML 文件中,只需添加
<camelContext xmlns="http://camel.apache.org/schema/spring"> <route> <from uri="mqseries:Foo.Bar"/> <to uri="activemq:Cheese"/> </route> </camelContext>
简介
ActiveMQ Classic 提供了桥接到其他 JMS 提供程序的功能,这些提供程序实现了 JMS 1.0.2 及更高版本规范。JMS 桥接可以与 ActiveMQ Classic 代理位于同一位置或远程运行。为了支持 JMS 1.0.2,队列和主题之间存在分离。
入站消息交换中的临时目标和 replyTo 目标将自动处理,使 ActiveMQ Classic 服务能够处理外部 JMS TopicRequestor 或 QueueResquestor 交换。
属性
JMS 桥接主题连接器
属性名称 | 默认值 | 描述 |
---|---|---|
localTopicConnection | null | 如果设置,将用于连接到 ActiveMQ Classic |
localTopicConnectionFactory | null | 用于初始化 ActiveMQ Classic JMS 连接,如果未设置 localTopicConnection |
localClientId | null | 设置本地连接的 ID |
outboundClientId | null | 设置出站连接的 ID |
jndiLocalTemplate | Spring 默认模板 | 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localTopicConnection 或 localTopicConnectionFactory |
outboundTopicConnection | null | 如果设置,将用于连接到外部 JMS 提供程序 |
outboundTopicConnectionFactory | null | 用于初始化外部 JMS 连接,如果未设置 outboundTopicConnection |
jndiOutboundTemplate | Spring 默认模板 | 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localTopicConnection 或 localTopicConnectionFactory |
localUsername | null | 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证 |
localPassword | null | 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证 |
outboundUsername | null | 如果设置,将用于向外部 JMS 提供程序进行身份验证 |
outboundPassword | null | 如果设置,将用于向外部 JMS 提供程序进行身份验证 |
inboundMessageConvertor | null | 如果设置,将用于将外部 JMS 消息转换为 ActiveMQ Classic 的格式 |
outboundMessageConvertor | null | 如果设置,将用于将 ActiveMQ Classic 消息转换为外部 JMS 提供程序的格式 |
inboundTopicBridges | null | InboundTopicBridge 实例数组 - 用于定义来自外部 JMS 提供程序的入站(订阅)流量 |
outboundTopicBridges | null | OutboundTopicBridge 实例数组 - 用于定义将发布到外部 JMS 提供程序的目标 |
JMS 桥接队列连接器
属性名称 | 默认值 | 描述 |
---|---|---|
localQueueConnection | null | 如果设置,将用于连接到 ActiveMQ Classic |
localQueueConnectionFactory | null | 用于初始化 ActiveMQ Classic JMS 连接,如果未设置 localQueueConnection |
localClientId | null | 设置本地连接的 ID |
outboundClientId | null | 设置出站连接的 ID |
jndiLocalTemplate | Spring 默认模板 | 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localQueueConnection 或 localQueueConnectionFactory |
outboundQueueConnection | null | 如果设置,将用于连接到外部 JMS 提供程序 |
outboundQueueConnectionFactory | null | 用于初始化外部 JMS 连接,如果未设置 localQueueConnection |
jndiOutboundTemplate | Spring 默认模板 | 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localQueueConnection 或 localQueueConnectionFactory |
localUsername | null | 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证 |
localPassword | null | 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证 |
outboundUsername | null | 如果设置,将用于向外部 JMS 提供程序进行身份验证 |
outboundPassword | null | 如果设置,将用于向外部 JMS 提供程序进行身份验证 |
inboundMessageConvertor | null | 如果设置,将用于将外部 JMS 消息转换为 ActiveMQ Classic 的格式 |
outboundMessageConvertor | null | 如果设置,将用于将 ActiveMQ Classic 消息转换为外部 JMS 提供程序的格式 |
inboundQueueBridges | null | InboundQueueBridge 实例数组 - 用于定义来自外部 JMS 提供程序的入站(订阅)流量 |
outboundQueueBridges | null | OutboundQueueBridge 实例数组 - 用于定义将转发到外部 JMS 提供程序的目标 |
主题桥接
InboundTopicBridge
属性名称 | 默认值 | 描述 |
---|---|---|
localTopicName | null | 本地 ActiveMQ Classic 队列的名称 |
inboundTopicName | null | 要订阅的外部主题名称 |
selector | null | 要使用的选择器 - 如果有 |
consumerName | null | 如果设置,将创建一个持久消费者 |
OutboundTopicBridge
属性名称 | 默认值 | 描述 |
---|---|---|
localTopicName | null | 本地 ActiveMQ Classic 队列的名称 |
outboundTopicName | null | 要发布到的外部主题名称 |
队列桥接
InboundQueueBridge
属性名称 | 默认值 | 描述 |
---|---|---|
localQueueName | null | 本地 ActiveMQ Classic 队列的名称 |
inboundQueueName | null | 要从中接收的外部队列名称 |
selector | null | 要使用的选择器 - 如果有 |
OutboundQueueBridge
属性名称 | 默认值 | 描述 |
---|---|---|
localQueueName | null | 本地 ActiveMQ Classic 队列的名称 |
outboundQueueName | null | 要发送到的外部队列名称 |
示例 XBean 配置
以下 示例配置文件 展示了如何使用常规 Xml 配置 配置 JMS 到 JMS 桥接。
<broker xmlns="https://activemq.apache.org/schema/core" id="localbroker"
brokerName="localBroker" persistent="false">
<jmsBridgeConnectors>
<jmsQueueConnector
outboundQueueConnectionFactory="#remoteFactory">
<inboundQueueBridges>
<inboundQueueBridge
inboundQueueName="org.apache.activemq.network.jms.QueueBridgeXBeanTest" />
</inboundQueueBridges>
</jmsQueueConnector>
</jmsBridgeConnectors>
<transportConnectors>
<transportConnector uri="tcp://127.0.0.1:61234" />
</transportConnectors>
</broker>
<!-- JMS ConnectionFactory to use remote -->
<bean id="remoteFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61666" />
</bean>
示例纯 Spring 配置
以下示例展示了如何使用原始 Spring XML 将代理与桥接到外部 JMS 提供程序连接起来
<!-- local broker with embedded Jms to Jms bridge (ok - it's contrived) -->
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
init-method="start">
<property name="brokerName" value = "localBroker"/>
<property name="persistent" value = "false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://127.0.0.1:61234</value>
</list>
</property>
<property name="jmsBridgeConnectors">
<list>
<ref bean="jmsConnector"/>
</list>
</property>
</bean>
<!-- JMS ConnectionFactory to use local broker (the one with the bridge) -->
<bean id="localFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61234" />
</bean>
<!--JmsTopicConnector - the Jms bridge -->
<bean id="jmsConnector"
class="org.apache.activemq.network.jms.JmsTopicConnector">
<property name = "outboundTopicConnectionFactory" ref = "remoteFactory"/>
<property name = "inboundTopicBridges">
<list>
<ref bean="InboundTopicBridge" />
</list>
</property>
</bean>
<bean id ="InboundTopicBridge" class="org.apache.activemq.network.jms.InboundTopicBridge">
<property name = "inboundTopicName" value = "org.apache.activemq.network.jms.TopicBridgeSpringTest"/>
</bean>
示例 XBean 配置,将 ActiveMQ Classic 桥接到没有 URL 设置器的提供程序
某些 JMS 提供程序(例如 WebLogic)在其 ConnectionFactory 对象上没有公开连接属性(如主机和端口 (setBrokerUrl))的设置器。在这种情况下,您需要在 activemq.xml 配置文件中设置 outboundQueueConnectionFactoryName 和 jndiOutboundTemplate。
<!-- START SNIPPET: example -->
<beans>
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker useJmx="true" xmlns="http://activemq.org/config/1.0">
<persistenceAdapter>
<journaledJDBC journalLogFiles="5" dataDirectory="${activemq.home}/activemq-data"/>
</persistenceAdapter>
<transportConnectors>
<transportConnector name="default" uri="tcp://127.0.0.1:61616" discoveryUri="multicast://default"/>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613"/>
</transportConnectors>
<networkConnectors>
<networkConnector name="default" uri="multicast://default"/>
</networkConnectors>
<jmsBridgeConnectors>
<jmsQueueConnector name="JreportRequestBridge-Inbound"
jndiOutboundTemplate="#remoteJndi"
outboundQueueConnectionFactoryName="jms/ConnectionFactory"
localQueueConnectionFactory="#localFactory">
<inboundQueueBridges>
<inboundQueueBridge inboundQueueName="jms/queue/jreport/request"/>
</inboundQueueBridges>
</jmsQueueConnector>
</jmsBridgeConnectors>
</broker>
<!-- Set up the template for connecting to Weblogic -->
<bean id="remoteJndi" class="org.springframework.jndi.JndiTemplate">
<property name="environment">
<props>
<prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
<prop key="java.naming.provider.url">t3://<your ip here>:7001</prop>
</props>
</property>
</bean>
<bean id="localFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:61616" />
</bean>
<bean id="localQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="dynamic/jms.queue.jreport.request"/>
</bean>
</beans>
<!-- END SNIPPET: xbean -->
示例纯 Spring 配置,用于通过桥接将消息发送到外部 ActiveMQ Classic 目标
Spring bean
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">
<beans>
<bean id="mainBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
<property name="brokerName" value = "mainBroker"/>
<property name="persistent" value="false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://127.0.0.1:7000</value>
</list>
</property>
</bean>
<bean id="bridgedBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
<property name="brokerName" value = "bridgedBroker"/>
<property name="persistent" value="false"/>
<property name="transportConnectorURIs">
<list>
<value>tcp://127.0.0.1:7001</value>
</list>
</property>
<property name="jmsBridgeConnectors">
<list>
<bean class="org.apache.activemq.network.jms.JmsQueueConnector">
<property name="outboundQueueConnectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:7000" />
</bean>
</property>
<property name="outboundQueueBridges">
<list>
<bean class="org.apache.activemq.network.jms.OutboundQueueBridge">
<constructor-arg value="messages.input"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</beans>
Java 代码
public class BridgeTest {
public BridgeTest() throws Exception {
Log log = LogFactory.getLog(getClass());
new ClassPathXmlApplicationContext("bridge/context-bridge.xml");
ActiveMQConnection connection = ActiveMQConnection.makeConnection("tcp://127.0.0.1:7001");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("messages.input");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("Test Message"));
log.debug("send message");
session.close();
connection.close();
connection = ActiveMQConnection.makeConnection("tcp://127.0.0.1:7000");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("messages.input");
MessageConsumer consumer = session.createConsumer(destination);
log.debug("receive message");
Message message = consumer.receive(5000);
log.debug("Received: " + message);
session.close();
connection.close();
}
public static void main(String[] args) throws Exception {
new BridgeTest();
}
}