JMS 到 JMS 桥接

连接性 > JMS 到 JMS 桥接

警告,首先尝试 Camel!

请注意,我们建议您考虑使用 Apache Camel 将 ActiveMQ Classic 桥接到任何消息代理(或实际上是 任何其他技术、协议或中间件),因为这更容易

  • 保持灵活性;它很容易将不同的队列/主题映射到另一个提供程序上的一个或多个队列或主题
  • 执行基于内容的路由、过滤和其他 企业集成模式
  • 允许您使用 任何技术、协议或中间件,而不仅仅是 JMS 提供程序

例如,在您的 Spring XML 文件中,只需添加

<camelContext xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="mqseries:Foo.Bar"/>
    <to uri="activemq:Cheese"/>
  </route>
</camelContext>

简介

ActiveMQ Classic 提供了桥接到其他 JMS 提供程序的功能,这些提供程序实现了 JMS 1.0.2 及更高版本规范。JMS 桥接可以与 ActiveMQ Classic 代理位于同一位置或远程运行。为了支持 JMS 1.0.2,队列和主题之间存在分离。

入站消息交换中的临时目标和 replyTo 目标将自动处理,使 ActiveMQ Classic 服务能够处理外部 JMS TopicRequestor 或 QueueResquestor 交换。

属性

JMS 桥接主题连接器

属性名称 默认值 描述
localTopicConnection null 如果设置,将用于连接到 ActiveMQ Classic
localTopicConnectionFactory null 用于初始化 ActiveMQ Classic JMS 连接,如果未设置 localTopicConnection
localClientId null 设置本地连接的 ID
outboundClientId null 设置出站连接的 ID
jndiLocalTemplate Spring 默认模板 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localTopicConnection 或 localTopicConnectionFactory
outboundTopicConnection null 如果设置,将用于连接到外部 JMS 提供程序
outboundTopicConnectionFactory null 用于初始化外部 JMS 连接,如果未设置 outboundTopicConnection
jndiOutboundTemplate Spring 默认模板 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localTopicConnection 或 localTopicConnectionFactory
localUsername null 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证
localPassword null 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证
outboundUsername null 如果设置,将用于向外部 JMS 提供程序进行身份验证
outboundPassword null 如果设置,将用于向外部 JMS 提供程序进行身份验证
inboundMessageConvertor null 如果设置,将用于将外部 JMS 消息转换为 ActiveMQ Classic 的格式
outboundMessageConvertor null 如果设置,将用于将 ActiveMQ Classic 消息转换为外部 JMS 提供程序的格式
inboundTopicBridges null InboundTopicBridge 实例数组 - 用于定义来自外部 JMS 提供程序的入站(订阅)流量
outboundTopicBridges null OutboundTopicBridge 实例数组 - 用于定义将发布到外部 JMS 提供程序的目标

JMS 桥接队列连接器

属性名称 默认值 描述
localQueueConnection null 如果设置,将用于连接到 ActiveMQ Classic
localQueueConnectionFactory null 用于初始化 ActiveMQ Classic JMS 连接,如果未设置 localQueueConnection
localClientId null 设置本地连接的 ID
outboundClientId null 设置出站连接的 ID
jndiLocalTemplate Spring 默认模板 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localQueueConnection 或 localQueueConnectionFactory
outboundQueueConnection null 如果设置,将用于连接到外部 JMS 提供程序
outboundQueueConnectionFactory null 用于初始化外部 JMS 连接,如果未设置 localQueueConnection
jndiOutboundTemplate Spring 默认模板 用于查找 ActiveMQ Classic 连接的连接工厂,如果未设置 localQueueConnection 或 localQueueConnectionFactory
localUsername null 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证
localPassword null 如果设置,将用于向 ActiveMQ Classic JMS 提供程序进行身份验证
outboundUsername null 如果设置,将用于向外部 JMS 提供程序进行身份验证
outboundPassword null 如果设置,将用于向外部 JMS 提供程序进行身份验证
inboundMessageConvertor null 如果设置,将用于将外部 JMS 消息转换为 ActiveMQ Classic 的格式
outboundMessageConvertor null 如果设置,将用于将 ActiveMQ Classic 消息转换为外部 JMS 提供程序的格式
inboundQueueBridges null InboundQueueBridge 实例数组 - 用于定义来自外部 JMS 提供程序的入站(订阅)流量
outboundQueueBridges null OutboundQueueBridge 实例数组 - 用于定义将转发到外部 JMS 提供程序的目标

主题桥接

InboundTopicBridge

属性名称 默认值 描述
localTopicName null 本地 ActiveMQ Classic 队列的名称
inboundTopicName null 要订阅的外部主题名称
selector null 要使用的选择器 - 如果有
consumerName null 如果设置,将创建一个持久消费者

OutboundTopicBridge

属性名称 默认值 描述
localTopicName null 本地 ActiveMQ Classic 队列的名称
outboundTopicName null 要发布到的外部主题名称

队列桥接

InboundQueueBridge

属性名称 默认值 描述
localQueueName null 本地 ActiveMQ Classic 队列的名称
inboundQueueName null 要从中接收的外部队列名称
selector null 要使用的选择器 - 如果有

OutboundQueueBridge

属性名称 默认值 描述
localQueueName null 本地 ActiveMQ Classic 队列的名称
outboundQueueName null 要发送到的外部队列名称

示例 XBean 配置

以下 示例配置文件 展示了如何使用常规 Xml 配置 配置 JMS 到 JMS 桥接。

<broker xmlns="https://activemq.apache.org/schema/core" id="localbroker"
  brokerName="localBroker" persistent="false">
  <jmsBridgeConnectors>
    <jmsQueueConnector
      outboundQueueConnectionFactory="#remoteFactory">
      <inboundQueueBridges>
        <inboundQueueBridge
          inboundQueueName="org.apache.activemq.network.jms.QueueBridgeXBeanTest" />
      </inboundQueueBridges>
    </jmsQueueConnector>
  </jmsBridgeConnectors>
  <transportConnectors>
    <transportConnector uri="tcp://127.0.0.1:61234" />
  </transportConnectors>
</broker>
<!-- JMS ConnectionFactory to use remote -->
<bean id="remoteFactory"
  class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://127.0.0.1:61666" />
</bean>

示例纯 Spring 配置

以下示例展示了如何使用原始 Spring XML 将代理与桥接到外部 JMS 提供程序连接起来

<!-- local broker with embedded Jms to Jms bridge (ok - it's contrived) -->
<bean id="localbroker" class="org.apache.activemq.broker.BrokerService"
  init-method="start">
  <property name="brokerName" value = "localBroker"/>
    <property name="persistent" value = "false"/>
  <property name="transportConnectorURIs">
    <list>
      <value>tcp://127.0.0.1:61234</value>
    </list>
  </property>
  <property name="jmsBridgeConnectors">
    <list>
      <ref bean="jmsConnector"/>
    </list>
  </property>
</bean>

<!-- JMS ConnectionFactory to use local broker (the one with the bridge) -->
<bean id="localFactory"
  class="org.apache.activemq.ActiveMQConnectionFactory">
  <property name="brokerURL" value="tcp://127.0.0.1:61234" />
</bean>

<!--JmsTopicConnector - the Jms bridge -->
<bean id="jmsConnector"
  class="org.apache.activemq.network.jms.JmsTopicConnector">
  <property name = "outboundTopicConnectionFactory" ref = "remoteFactory"/>

  <property name = "inboundTopicBridges">
    <list>
      <ref bean="InboundTopicBridge" />
    </list>
  </property>

</bean>

<bean id ="InboundTopicBridge" class="org.apache.activemq.network.jms.InboundTopicBridge">
  <property name = "inboundTopicName" value = "org.apache.activemq.network.jms.TopicBridgeSpringTest"/>
</bean>

示例 XBean 配置,将 ActiveMQ Classic 桥接到没有 URL 设置器的提供程序

某些 JMS 提供程序(例如 WebLogic)在其 ConnectionFactory 对象上没有公开连接属性(如主机和端口 (setBrokerUrl))的设置器。在这种情况下,您需要在 activemq.xml 配置文件中设置 outboundQueueConnectionFactoryName 和 jndiOutboundTemplate。

<!-- START SNIPPET: example -->
<beans>

  <!-- Allows us to use system properties as variables in this configuration file -->
  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>

  <broker useJmx="true" xmlns="http://activemq.org/config/1.0">

    <persistenceAdapter>
        <journaledJDBC journalLogFiles="5" dataDirectory="${activemq.home}/activemq-data"/>
    </persistenceAdapter>

    <transportConnectors>
       <transportConnector name="default" uri="tcp://127.0.0.1:61616" discoveryUri="multicast://default"/>
       <transportConnector name="stomp"   uri="stomp://127.0.0.1:61613"/>
    </transportConnectors>

    <networkConnectors>
      <networkConnector name="default" uri="multicast://default"/>
    </networkConnectors>

    <jmsBridgeConnectors>
     <jmsQueueConnector name="JreportRequestBridge-Inbound"
        jndiOutboundTemplate="#remoteJndi"
        outboundQueueConnectionFactoryName="jms/ConnectionFactory"
        localQueueConnectionFactory="#localFactory">
        <inboundQueueBridges>
          <inboundQueueBridge inboundQueueName="jms/queue/jreport/request"/>
        </inboundQueueBridges>
      </jmsQueueConnector>
    </jmsBridgeConnectors>

  </broker>

    <!-- Set up the template for connecting to Weblogic -->
    <bean id="remoteJndi" class="org.springframework.jndi.JndiTemplate">
        <property name="environment">
                <props>
                        <prop key="java.naming.factory.initial">weblogic.jndi.WLInitialContextFactory</prop>
                        <prop key="java.naming.provider.url">t3://<your ip here>:7001</prop>
                </props>
        </property>
    </bean>

  <bean id="localFactory"
    class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://127.0.0.1:61616" />
  </bean>

  <bean id="localQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="dynamic/jms.queue.jreport.request"/>
  </bean>
</beans>
<!-- END SNIPPET: xbean -->

示例纯 Spring 配置,用于通过桥接将消息发送到外部 ActiveMQ Classic 目标

Spring bean

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "http://www.springframework.org/dtd/spring-beans.dtd">

<beans>

  <bean id="mainBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
    <property name="brokerName" value = "mainBroker"/>
    <property name="persistent" value="false"/>
    <property name="transportConnectorURIs">
      <list>
        <value>tcp://127.0.0.1:7000</value>
      </list>
    </property>
  </bean>

  <bean id="bridgedBroker" class="org.apache.activemq.broker.BrokerService" init-method="start" destroy-method="stop">
    <property name="brokerName" value = "bridgedBroker"/>
    <property name="persistent" value="false"/>
    <property name="transportConnectorURIs">
      <list>
        <value>tcp://127.0.0.1:7001</value>
      </list>
    </property>
    <property name="jmsBridgeConnectors">
      <list>
        <bean class="org.apache.activemq.network.jms.JmsQueueConnector">
          <property name="outboundQueueConnectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
              <property name="brokerURL" value="tcp://127.0.0.1:7000" />
            </bean>
          </property>
          <property name="outboundQueueBridges">
            <list>
              <bean class="org.apache.activemq.network.jms.OutboundQueueBridge">
                <constructor-arg value="messages.input"/>
              </bean>
            </list>
          </property>
        </bean>
      </list>
    </property>
  </bean>

</beans>

Java 代码

public class BridgeTest {

  public BridgeTest() throws Exception {

      Log log = LogFactory.getLog(getClass());

      new ClassPathXmlApplicationContext("bridge/context-bridge.xml");

      ActiveMQConnection connection = ActiveMQConnection.makeConnection("tcp://127.0.0.1:7001");
      connection.start();
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      Destination destination = session.createQueue("messages.input");
      MessageProducer producer = session.createProducer(destination);
      producer.send(session.createTextMessage("Test Message"));
      log.debug("send message");
      session.close();
      connection.close();

      connection = ActiveMQConnection.makeConnection("tcp://127.0.0.1:7000");
      connection.start();
      session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      destination = session.createQueue("messages.input");
      MessageConsumer consumer = session.createConsumer(destination);
      log.debug("receive message");
      Message message = consumer.receive(5000);
      log.debug("Received: " + message);
      session.close();
      connection.close();
  }

  public static void main(String[] args) throws Exception {
    new BridgeTest();
  }

}

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛徽标和 Apache ActiveMQ 项目徽标是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据 Apache 许可证 2.0 许可。