Apache ActiveMQ Artemis 支持的每种消息传递协议和 API 都定义了不同的消息传递资源集。
-
JMS 使用 *队列* 和 *主题*
-
STOMP 使用通用 *目标*
-
MQTT 使用 *主题*
-
AMQP 使用通用 *节点*
为了处理这些独特语义和用例,代理提供了一种灵活且强大的地址模型,该模型基于以下 *核心* 资源集
-
地址
-
队列
-
路由类型
1. 地址
消息被 *发送* 到地址。地址被赋予一个唯一的名称、一个路由类型和零个或多个队列。
2. 队列
消息被 *消费* 从队列。队列绑定到地址。它被赋予一个唯一的名称和一个路由类型。一个地址可以绑定零个或多个队列。当消息发送到地址时,它会根据配置的路由类型路由到一个或多个队列。
队列的名称必须是 *全局* 唯一的。例如,您不能在地址 a1
上有一个名为 q1
的队列,并且在地址 a2
上也有一 个名为 q1
的队列。
3. 路由类型
路由类型决定了消息如何从地址路由到绑定到该地址的队列。支持两种不同的路由类型,**任意播** 和 **多播**。
如果您想让您的消息路由到… | 使用此路由类型… |
---|---|
地址上的单个队列 |
任意播 |
地址上的每个队列 |
多播 |
可以为同一个地址定义具有不同路由类型的队列,但这通常会导致反模式,因此不建议这样做。 |
4. 自动配置
默认情况下,Apache ActiveMQ Artemis 会自动创建地址和队列以支持您正在使用的任何协议的语义。代理了解如何使用核心资源支持每个协议的功能,因此在大多数情况下不需要手动配置。这样可以避免您在客户端连接到地址和队列之前预先配置每个地址和队列。
代理可以选择配置为在地址和队列不再使用时自动删除它们。
自动创建和删除是在每个地址的基础上配置的,并由以下 address-setting
元素控制
-
auto-create-addresses
-
auto-delete-addresses
-
default-address-routing-type
-
auto-create-queues
-
auto-delete-queues
-
default-queue-routing-type
自动队列创建用于 *在正常操作期间不会创建* 的队列。例如,当远程应用程序在 JMS 主题上创建消费者时,将创建一个代表该订阅的队列,如 JMS 到核心映射章节 中所述。此队列将被创建,而不管 auto-create-queues 如何配置,因为它对于 *正常操作* 是必需的。 |
有关这些元素的更多详细信息,请参阅 有关地址设置的文档。
当然,可以禁用自动配置,并且所有内容都可以手动配置。请继续阅读有关手动配置的更多详细信息。
5. 基本手动配置
以下示例展示了如何为基本任意播和多播用例配置资源。
这些用例的许多细节与协议无关。这里的目标是演示和解释基本配置元素以及地址模型的工作原理。 |
5.1. 任意播
任意播语义最常见的用例(有时称为 点对点)涉及应用程序遵循“竞争消费者”模式来接收来自共享队列的消息。接收消息的消费者越多,总消息吞吐量就越大。多个 Java 应用程序共享 JMS 队列是这种用例的典型示例。
在这种用例中,代理被配置为(例如)使用 anycast
路由类型和一个队列 q1
来配置一个地址 address.foo
。当生产者将消息发送到 address.foo
时,它将被路由到 q1
,然后最终分派给其中一个消费者。
图 1. 任意播
此用例的配置在 etc/broker.xml
中将如下所示
<addresses>
<address name="address.foo">
<anycast>
<queue name="q1"/>
</anycast>
</address>
</addresses>
对于大多数支持此类用例的协议和 API(例如 JMS、AMQP 等),在发送和消费消息时使用 *相同名称* 是惯例。在这种情况下,您将使用如下配置
<addresses>
<address name="orderQueue">
<anycast>
<queue name="orderQueue"/>
</anycast>
</address>
</addresses>
5.2. 多播
多播语义最常见的用例(有时称为 发布/订阅 或“发布/订阅”)涉及每个应用程序接收发送到地址的每条消息。多个应用程序从 JMS 主题中消费是这种用例的典型示例。MQTT 订阅是多播语义的另一个支持示例。
在这种用例中,代理使用 multicast
路由类型和两个队列 q1
和 q2
来配置一个地址 address.foo
。当生产者将消息发送到 address.foo
时,它将被路由到 *两个* q1
和 q2
,因此最终两个消费者都将收到相同的消息。
图 2. 多播
此用例的配置在 etc/broker.xml
中将如下所示
<addresses>
<address name="address.foo">
<multicast>
<queue name="q1"/>
<queue name="q2"/>
</multicast>
</address>
</addresses>
这种基本配置简单直接,但有一个问题。在 JMS 主题或 MQTT 等正常的发布/订阅用例中, *预先不知道* 订阅者的数量。在这种情况下,建议使用以下配置
<addresses>
<address name="address.foo">
<multicast/>
</address>
</addresses>
定义 <multicast/>
而不带任何队列,代理将在消费者连接到 address.foo
时为每个订阅自动创建队列。然后,当消息发送到 address.foo
时,它将被路由到每个订阅者的每个队列,因此每个订阅者都会收到每条消息。这些队列通常称为 *订阅队列*,因为原因很明显。
这些订阅队列通常根据用于创建它们的协议的语义命名。例如,JMS 支持持久订阅和非持久订阅。非持久订阅的队列使用 UUID 命名,但持久订阅使用的队列根据 JMS“客户端 ID”和“订阅名称”命名。AMQP、MQTT、STOMP 等也使用类似的约定。
6. 高级手动配置
6.1. 完整限定队列名称
在大多数情况下,静态配置上述订阅队列是没有必要或不必要的。但是,在某些情况下,用户可能希望静态配置订阅队列,然后使用 **完整限定队列名称** (FQQN) 直接连接到该队列。
FQQN 使用特殊语法来指定 *地址* 和 *队列*,以便使用不原生理解地址/队列分离的协议和 API(例如 AMQP、JMS 等)的应用程序可以直接发送消息或订阅 *到* 队列,而不是局限于地址。应用程序只需使用地址名称和队列名称,用 ::
(例如 address::queue
)分隔即可。
在此示例中,地址 a1
使用两个队列配置:q1
、q2
,如下面的配置所示。
<addresses>
<address name="a1">
<multicast>
<queue name="q1" />
<queue name="q2" />
</multicast>
</address>
</addresses>
以下是用 JMS 的 Java 代码片段演示了 FQQN 语法
Queue q1 session.createQueue("a1::q1");
MessageConsumer consumer = session.createConsumer(q1);
字符串 :: 仅应用于 FQQN,而不要在地址或队列名称中的任何其他上下文中使用。 |
以下示例展示了如何使用代理端配置来静态配置具有发布订阅行为的队列,用于共享、非共享、持久和非持久订阅行为。
6.1.1. 使用 max-consumers
的共享持久订阅队列
队列的默认行为是不限制连接的队列消费者数量。队列元素的 max-consumers
参数可用于限制任何时刻允许的连接消费者数量。
打开文件 etc/broker.xml
进行编辑。
<addresses>
<address name="durable.foo">
<multicast>
<!-- pre-configured shared durable subscription queue -->
<queue name="q1" max-consumers="10">
<durable>true</durable>
</queue>
</multicast>
</address>
</addresses>
6.1.2. 非共享持久订阅队列
代理可以配置为防止多个消费者同时连接到一个队列。因此,对这样配置的队列的订阅是“非共享的”。要做到这一点,只需将 max-consumers
参数设置为 1
<addresses>
<address name="durable.foo">
<multicast>
<!-- pre-configured non shared durable subscription queue -->
<queue name="q1" max-consumers="1">
<durable>true</durable>
</queue>
</multicast>
</address>
</addresses>
6.1.3. 非持久订阅队列
非持久订阅通常由相关协议管理器管理,通过创建和删除临时队列来管理。
如果用户需要预先创建一个行为类似于非持久订阅队列的队列,可以在队列上启用 purge-on-no-consumers
标志。当 purge-on-no-consumers
设置为 true
时,该队列将不会开始接收消息,直到有消费者连接。当最后一个消费者从队列中分离时,该队列将被清除(其消息将被删除),并且不会接收任何更多消息,直到有新的消费者连接。
打开文件 etc/broker.xml
进行编辑。
<addresses>
<address name="non.shared.durable.foo">
<multicast>
<queue name="orders1" purge-on-no-consumers="true"/>
</multicast>
</address>
</addresses>
6.2. 禁用队列
如果用户需要静态配置队列并禁用路由到该队列,例如,需要定义队列以便消费者可以绑定,但您希望暂时禁用消息路由到该队列。
或者,您需要停止消息流到队列以允许调查并保持消费者绑定,但不想让更多消息路由到队列以避免消息积压。
当 enabled
设置为 true
时,队列将接收消息路由到它。(默认)
当 enabled
设置为 false
时,队列将 *不会* 接收消息路由到它。
打开文件 etc/broker.xml
进行编辑。
<addresses>
<address name="foo.bar">
<multicast>
<queue name="orders1" enabled="false"/>
</multicast>
</address>
</addresses>
禁用地址上的所有队列意味着发送到该地址的任何消息都将被静默丢弃。 |
6.3. 临时队列
对于某些仅支持单片“目标”而没有地址/队列分离的协议和 API(例如 AMQP、JMS 等),代理会使用 UUID(即通用唯一标识符)作为地址和队列的名称来创建临时队列。由于名称是 UUID,因此无法为其创建address-setting
,其match
除了#
之外别无选择。
为了解决这个问题,可以在broker.xml
中指定temporary-queue-namespace
,然后创建一个address-setting
,其match
值对应于配置的temporary-queue-namespace
。当设置了temporary-queue-namespace
并创建了临时队列时,代理会将temporary-queue-namespace
值以及在wildcard-addresses
中配置的delimiter
值(默认为.
)追加到地址名称,并使用该名称来查找关联的address-setting
值。
以下是一个简单的示例配置
<temporary-queue-namespace>temp</temporary-queue-namespace>
<address-settings>
<address-setting match="temp.#">
<enable-metrics>false</enable-metrics>
</address-setting>
</address-settings>
使用此配置,任何临时队列都将禁用指标。
此设置不会更改临时队列的实际名称。它只更改用于查找地址设置的名称。 |
7. 如何过滤消息
Apache ActiveMQ Artemis 支持使用 过滤器表达式 过滤消息。
过滤器可以在两个地方应用 - 在队列上和在消费者上。
在队列上过滤消息比在消费者上过滤消息性能更高,因为不需要扫描消息。但是,队列过滤器通常没有那么灵活。
7.1. 队列过滤器
当将过滤器应用于队列时,消息在路由到队列之前会被过滤。要添加过滤器,请在配置队列时使用filter
元素,例如
<addresses>
<address name="filter">
<anycast>
<queue name="filter">
<filter string="color='red'"/>
</queue>
</anycast>
</address>
</addresses>
上面定义的过滤器确保只有具有属性"color='red'"
的消息会被发送到此队列。
7.2. 消费者过滤器
消费者过滤器在消息路由到队列之后应用,并使用相应的客户端 API 定义。以下 JMS 示例展示了消费者过滤器的工作原理。
在etc/broker.xml
中定义一个具有单个队列的地址,不应用任何过滤器。
<addresses>
<address name="filter">
<anycast>
<queue name="filter"/>
</anycast>
</address>
</addresses>
然后向队列发送一些消息。
...
// Send some messages
for (int i = 0; i < 3; i ++) {
TextMessage redMessage = senderSession.createTextMessage("Red");
redMessage.setStringProperty("color", "red");
producer.send(redMessage)
TextMessage greenMessage = senderSession.createTextMessage("Green");
greenMessage.setStringProperty("color", "green");
producer.send(greenMessage)
}
此时,队列将包含 6 条消息:红色、绿色、红色、绿色、红色、绿色。
创建具有过滤器color='red'
的消费者。
MessageConsumer redConsumer = redSession.createConsumer(queue, "color='red'");
redConsumer
具有一个仅匹配“红色”消息的过滤器。redConsumer
将接收 3 条消息。
red, red, red
现在,结果队列将为
green, green, green
8. 确定路由类型的其他方法
通常,路由类型由静态 XML 配置或用于 自动地址和队列创建 的default-address-routing-type
和default-queue-routing-type
address-setting
元素确定。但是,还有另外两种方法可以指定路由类型
-
客户端应用程序在发送消息或创建消费者时可以使用的可配置前缀
-
客户端应用程序可以在其发送的消息上设置的属性
8.1. 使用前缀确定路由类型
这些前缀使用acceptor
的 URL 中的anycastPrefix
和multicastPrefix
参数配置。当需要多个值时,可以使用逗号将它们分隔开。
8.1.1. 配置 Anycast 前缀
在etc/broker.xml
中,将anycastPrefix
添加到所需acceptor
的 URL 中。在下面的示例中,acceptor 配置为使用queue/
作为anycastPrefix
。如果客户端想要使用 Anycast 路由,则客户端代码可以指定queue/foo/
。
<acceptor name="artemis">tcp://0.0.0.0:61616?protocols=AMQP;anycastPrefix=queue/</acceptor>
例如,考虑一个 STOMP 客户端,它想要使用 Anycast 语义将消息发送到一个不存在的队列。另外,还要考虑代理被配置为自动创建地址和队列,但default-address-routing-type
和default-queue-routing-type
均为MULTICAST
。由于anycastPrefix
为queue/
,因此它只需将消息发送到queue/foo
,代理会自动创建一个名为foo
的地址,以及一个同样名为foo
的 Anycast 队列。
8.1.2. 配置 Multicast 前缀
在etc/broker.xml
中,将multicastPrefix
添加到所需acceptor
的 URL 中。在下面的示例中,acceptor 配置为使用topic/
作为multicastPrefix
。如果客户端想要使用 Multicast 路由,则客户端代码可以指定topic/foo/
。
<acceptor name="artemis">tcp://0.0.0.0:61616?protocols=AMQP;multicastPrefix=topic/</acceptor>
例如,考虑一个 STOMP 客户端,它想要在一个不存在的地址上创建具有 Multicast 语义的订阅。另外,还要考虑代理被配置为自动创建地址和队列,但default-address-routing-type
和default-queue-routing-type
均为ANYCAST
。由于multicastPrefix
为topic/
,因此它只需订阅topic/foo
,代理会自动创建一个名为foo
的地址,以及一个用于订阅的 Multicast 队列。发送到foo
的任何消息都会路由到订阅队列。
8.2. 使用消息属性确定路由类型
AMQ_ROUTING_TYPE
属性表示一个byte
值,代理将在消息_发送_时使用该值来确定路由类型。使用0
表示 Anycast 路由,使用1
表示 Multicast 路由。
消息将仅路由到与其_AMQ_ROUTING_TYPE 属性值匹配的队列(如果有)。例如,如果具有_AMQ_ROUTING_TYPE 值为1 (即 Multicast)的消息发送到一个只有 Anycast 队列的地址,则消息实际上不会路由到任何队列,因为路由类型不匹配。如果没有设置_AMQ_ROUTING_TYPE ,则消息将根据队列的路由语义路由到地址上的所有队列。 |