虽然 Apache ActiveMQ Artemis 提供了一个与 JMS 无关的消息传递 API,但许多用户会更习惯使用 JMS。

JMS 是一个非常流行的消息传递 API 标准,大多数消息传递系统都提供 JMS API。如果您完全不熟悉 JMS,我们建议您遵循 Oracle JMS 教程 - 本指南不包含完整的 JMS 教程。

Apache ActiveMQ Artemis 还附带了大量示例,其中许多示例演示了 JMS API 的用法。一个很好的起点是尝试使用简单的 JMS Queue 和 Topic 示例,但我们也提供了许多其他 JMS API 部分的示例。有关示例的完整描述,请参阅 示例

在本节中,我们将介绍为 JMS 配置服务器和创建简单的 JMS 程序的主要步骤。我们还将展示如何配置和使用 JNDI,以及如何在不使用任何 JNDI 的情况下使用 JMS 与 Apache ActiveMQ Artemis。

1. 一个简单的订购系统

在本章中,我们将使用一个非常简单的订购系统作为示例。由于其极端的简化,这是一个有点人为的例子,但它可以用来演示设置和使用 JMS 的基本知识。

我们将有一个名为 OrderQueue 的单个 JMS Queue,我们将有一个单个 MessageProducer 将订购消息发送到队列,以及一个单个 MessageConsumer 从队列中消费订购消息。

队列将是一个 持久 队列,即它将在服务器重启或崩溃后继续存在。我们还希望预部署队列,即在服务器配置中指定队列,以便它在无需我们显式地从客户端创建的情况下自动创建。

2. JNDI

JMS 规范规定,管理对象(即 JMS 队列、主题和连接工厂实例)通过 JNDI API 提供。经纪人可以根据需要实现 JNDI,只要实现符合 API 即可。Apache ActiveMQ Artemis 没有 JNDI 服务器。相反,它使用客户端 JNDI 实现,该实现依赖于环境中设置的特殊属性来构建适当的 JMS 对象。换句话说,Apache ActiveMQ Artemis 服务器上的 JNDI 中没有存储任何对象,相反,它们只是根据提供的配置在客户端上实例化。让我们看一下不同类型的管理对象以及如何配置它们。

以下配置属性在 Apache ActiveMQ Artemis 以独立模式运行时是严格要求的。当 Apache ActiveMQ Artemis 集成到应用程序服务器(例如 Wildfly)时,应用程序服务器本身几乎肯定会提供具有自身属性的 JNDI 客户端。

2.1. ConnectionFactory JNDI

JMS 连接工厂由客户端用于建立与服务器的连接。它知道要连接的服务器的位置,以及许多其他配置参数。

以下是一个简单的 JNDI 上下文环境示例,该环境用于客户端查找连接工厂以访问 Apache ActiveMQ Artemis 的嵌入实例

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory=vm://0

在这个实例中,我们创建了一个绑定到 invmConnectionFactory 的连接工厂,任何以 connectionFactory. 为前缀的条目都将创建一个连接工厂。

在某些情况下,可能有多个服务器实例在特定 JVM 中运行。在这种情况下,每个服务器通常都会有一个具有唯一服务器 ID 的 InVM 接收器。使用 JMS 和 JNDI 的客户端可以通过为每个服务器指定连接工厂来解决这种情况,如下所示

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.invmConnectionFactory0=vm://0
connectionFactory.invmConnectionFactory1=vm://1
connectionFactory.invmConnectionFactory2=vm://2

以下是所有支持的 URL 方案列表

  • vm

  • tcp

  • udp

  • jgroups

大多数客户端不会连接到嵌入式代理。客户端最常通过网络连接到远程代理。以下是一个简单的示例,客户端配置连接工厂以连接到运行在 myhost:5445 上的远程代理

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.ConnectionFactory=tcp://myhost:5445

在上面的示例中,客户端使用 tcp 方案作为提供程序 URL。客户端还可以在 URL 中指定多个用逗号分隔的 host:port 组合(例如 (tcp://remote-host1:5445,remote-host2:5445))。无论 URL 中是否包含一个或多个 host:port 组合,它们都被视为底层连接的初始连接器

还支持 udp 方案,该方案应使用与 ActiveMQ Artemis 服务器上配置的相应 broadcast-groupgroup-addressgroup-port 匹配的 host:port 组合。

每个方案都有一组特定的属性,可以使用传统的 URL 查询字符串格式(例如 scheme://host:port?key1=value1&key2=value2)来设置这些属性,以定制底层传输机制。例如,如果客户端想要使用 TCP 和 SSL 连接到远程服务器,它将创建一个如下所示的连接工厂,tcp://remote-host:5445?ssl-enabled=true

有关 tcp 方案的所有可用属性,请参阅 有关 Netty 传输的文档

注意,如果您使用 tcp 方案和多个地址,则可以将查询应用于所有 URL 或仅应用于单个连接器,因此,如果您有

  • (tcp://remote-host1:5445?httpEnabled=true,remote-host2:5445?httpEnabled=true)?clientID=1234

那么 httpEnabled 属性仅在各个连接器上设置,而 clientId 则在实际的连接工厂上设置。在整个 URI 上设置的任何连接器特定属性都将应用于所有连接器。

udp 方案支持 4 个属性

localAddress

如果您在同一台机器上运行多个网络接口,您可能希望指定发现组仅在特定接口上侦听。为此,您可以使用此参数指定接口地址。

localPort

如果您要指定数据报套接字绑定的本地端口,则可以在此处指定它。通常,您只需要使用默认值 -1,它表示应使用匿名端口。此参数始终与 localAddress 结合使用。

refreshTimeout

这是发现组在收到来自特定服务器的最后广播后等待的时间段,然后才会从其连接器对列表中删除该服务器的连接器对条目。通常,您会将其设置为明显高于广播组上的广播周期的值,否则,即使服务器仍在广播,由于计时上的细微差异,服务器也可能会间歇性地从列表中消失。此参数是可选的,默认值为 10000 毫秒(10 秒)。

discoveryInitialWaitTimeout

如果在创建后立即使用连接工厂,那么它可能没有足够的时间从集群中的所有节点接收广播。在首次使用时,连接工厂将确保它在创建后等待这么长时间,然后再创建第一个连接。此参数的默认值为 10000 毫秒。

最后,支持 jgroups 方案,它为服务器发现提供了 udp 方案的替代方案。URL 模式为 jgroups://channelName?file=jgroups-xml-conf-filename,其中 jgroups-xml-conf-filename 指的是类路径上包含 JGroups 配置的 XML 文件。channelName 是赋予创建的 jgroups 通道的名称。

udp 一样,支持 refreshTimeoutdiscoveryInitialWaitTimeout 属性。

默认连接工厂的默认类型为 javax.jms.ConnectionFactoryjakarta.jms.ConnectionFactory 类型,具体取决于您使用的客户端。可以通过设置类型来更改这一点,如下所示

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://127.0.0.1:5445?type=CF

在这个例子中,它仍然设置为默认值,下面列出了可以设置的类型。

2.1.1. 连接工厂类型的配置

提供的接口将取决于您使用的是 JMS 还是 Jakarta Messaging 客户端实现。

type interface

CF (默认)

javax.jms.ConnectionFactoryjakarta.jms.ConnectionFactory

XA_CF

javax.jms.XAConnectionFactoryjakarta.jms.XAConnectionFactory

QUEUE_CF

javax.jms.QueueConnectionFactoryjakarta.jms.QueueConnectionFactory

QUEUE_XA_CF

javax.jms.XAQueueConnectionFactoryjakarta.jms.XAQueueConnectionFactory

TOPIC_CF

javax.jms.TopicConnectionFactoryjakarta.jms.TopicConnectionFactory

TOPIC_XA_CF

javax.jms.XATopicConnectionFactoryjakarta.jms.XATopicConnectionFactory

2.2. Destination JNDI

JMS 目标通常也通过 JNDI 查找。与连接工厂一样,可以使用 JNDI 上下文环境中的特殊属性来配置目标。属性名称应遵循以下模式:queue.<jndi-binding>topic.<jndi-binding>。属性应该是 Apache ActiveMQ Artemis 服务器托管的队列的名称。例如,如果服务器配置了如下所示的 JMS 队列

<address name="OrderQueue">
   <queue name="OrderQueue"/>
</address>

如果客户端想要将此队列绑定到“queues/OrderQueue”,那么 JNDI 属性将配置如下

java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
java.naming.provider.url=tcp://myhost:5445
queue.queues/OrderQueue=OrderQueue

也可以查找未在 JNDI 上下文环境中显式配置的 JMS 目标。这可以使用 dynamicQueues/dynamicTopics/ 在查找字符串中实现。例如,如果客户端想要查找上述的“OrderQueue”,它可以通过简单地使用字符串“dynamicQueues/OrderQueue”来实现。注意,dynamicQueues/dynamicTopics/ 之后的内容必须完全对应于服务器上的目标名称。

2.3. 代码

以下是示例代码

首先,我们将创建一个 JNDI 初始上下文,从中查找我们的 JMS 对象。如果上述属性设置在 jndi.properties 中,并且它位于类路径上,那么任何新的空 InitialContext 都将使用这些属性进行初始化

InitialContext ic = new InitialContext();

//Now we'll look up the connection factory from which we can create
//connections to myhost:5445:

ConnectionFactory cf = (ConnectionFactory)ic.lookup("ConnectionFactory");

//And look up the Queue:

Queue orderQueue = (Queue)ic.lookup("queues/OrderQueue");

//Next we create a JMS connection using the connection factory:

Connection connection = cf.createConnection();

//And we create a non transacted JMS Session, with AUTO\_ACKNOWLe.g. //acknowledge mode:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//We create a MessageProducer that will send orders to the queue:

MessageProducer producer = session.createProducer(orderQueue);

//And we create a MessageConsumer which will consume orders from the
//queue:

MessageConsumer consumer = session.createConsumer(orderQueue);

//We make sure we start the connection, or delivery won't occur on it:

connection.start();

//We create a simple TextMessage and send it:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

//And we consume the message:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());

就这么简单。有关各种可工作的 JMS 示例,请参阅 示例

警告

请注意,JMS 连接、会话、生产者和消费者旨在重复使用

为每个要生产或消费的消息创建新的连接、会话、生产者和消费者是一个反模式。如果您这样做,您的应用程序的性能会很差。这将在性能调优部分 性能调优 中进一步讨论。

3. 不使用 JNDI 直接实例化 JMS 资源

虽然从 JNDI 查找 JMS 管理对象(即 JMS 队列、主题和连接工厂实例)是一个非常常见的 JMS 使用模式,但在某些情况下,您可能会想“为什么我需要 JNDI?为什么我不能直接实例化这些对象?”

使用 Apache ActiveMQ Artemis,您可以完全做到这一点。Apache ActiveMQ Artemis 支持直接实例化 JMS 队列、主题和连接工厂实例,因此您根本不需要使用 JNDI。

有关直接实例化的完整工作示例,请查看 直接实例化 JMS 对象 例子,该例子位于示例中的 JMS 部分。

以下是我们简单的示例,重写为完全不使用 JNDI。

我们通过 ActiveMQJMSClient 实用程序类创建 JMS ConnectionFactory 对象,请注意,我们需要提供连接参数并指定使用的传输方式,有关连接器的更多信息,请参见 配置传输方式

TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName());

ConnectionFactory cf = ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,transportConfiguration);

//We also create the JMS Queue object via the ActiveMQJMSClient Utility
//class:

Queue orderQueue = ActiveMQJMSClient.createQueue("OrderQueue");

//Next we create a JMS connection using the connection factory:

Connection connection = cf.createConnection();

//And we create a non transacted JMS Session, with AUTO\_ACKNOWLe.g. //acknowledge mode:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

//We create a MessageProducer that will send orders to the queue:

MessageProducer producer = session.createProducer(orderQueue);

//And we create a MessageConsumer which will consume orders from the
//queue:

MessageConsumer consumer = session.createConsumer(orderQueue);

//We make sure we start the connection, or delivery won't occur on it:

connection.start();

//We create a simple TextMessage and send it:

TextMessage message = session.createTextMessage("This is an order");
producer.send(message);

//And we consume the message:

TextMessage receivedMessage = (TextMessage)consumer.receive();
System.out.println("Got order: " + receivedMessage.getText());

4. 设置客户端 ID

这代表 JMS 客户端的客户端 ID,创建持久订阅时需要此 ID。可以在连接工厂上配置此 ID,并且可以通过 clientId 元素进行设置。此连接工厂创建的任何连接都将以此作为其客户端 ID。

5. 为 DUPS_OK 设置批处理大小

当 JMS 确认模式设置为 DUPS_OK 时,可以配置消费者以批处理方式发送确认,而不是一次发送一个,从而节省宝贵的带宽。这可以通过连接工厂上的 dupsOkBatchSize 元素进行配置,并以字节为单位设置。默认值为 1024 * 1024 字节 = 1 MiB。

6. 设置事务批处理大小

在事务中接收消息时,可以配置消费者以批处理方式发送确认,而不是单独发送,从而节省宝贵的带宽。这可以通过连接工厂上的 transactionBatchSize 元素进行配置,并以字节为单位设置。默认值为 1024 * 1024。

7. 设置目标缓存

许多框架(如 Spring)在每次操作时按名称解析目标,这会导致性能问题和对代理的额外调用,在目标(地址)是永久的代理端的情况下,例如它们由平台或运维团队管理。使用 cacheDestinations 元素,您可以切换目标缓存以提高性能并减少对代理的调用。如果目标(地址)不是永久的代理端,则不应使用此方法,因为它们是动态创建/删除的。