消息组是一组具有以下特征的消息:
-
消息组中的消息共享相同的组 ID,即它们具有相同的组标识符属性(JMS 的
JMSXGroupID
,Apache ActiveMQ Artemis 核心 API 的_AMQ_GROUP_ID
)。 -
消息组中的消息始终由同一个消费者消费,即使队列上有多个消费者。它们将具有相同组 ID 的所有消息固定到同一个消费者。如果该消费者关闭,则会选择另一个消费者,并接收具有相同组 ID 的所有消息。
当您希望某个属性特定值的全部消息都由同一个消费者按顺序处理时,消息组很有用。
例如,某个股票的订单。您可能希望任何特定股票的订单都由同一个消费者按顺序处理。为此,您可以创建一个消费者池(可能每个股票一个,但更少的消费者也能工作),然后将股票名称设置为 _AMQ_GROUP_ID
属性的值。
这将确保特定股票的所有消息始终由同一个消费者处理。
由于队列的底层 FIFO 语义,分组消息可能会影响非分组消息的并发处理。例如,如果队列头部有一批 100 个分组消息,后面跟着 1000 个非分组消息,那么所有分组消息都需要发送到相应的客户端(按顺序消费这些分组消息的客户端),然后才能消费任何非分组消息。这种情况下的功能影响是,在处理所有分组消息时,并发消息处理暂时挂起。这可能是性能瓶颈,因此在确定消息组大小,以及是否应将分组消息与非分组消息隔离时要牢记这一点。 |
1. 使用核心 API
用于识别消息组的属性名为 "_AMQ_GROUP_ID"
(或常量 MessageImpl.HDR_GROUP_ID
)。或者,您可以在 SessionFactory
上将 autogroup
设置为 true,它将选择一个随机的唯一 ID。
2. 使用 JMS
用于识别消息组的属性名为 JMSXGroupID
。
// send 2 messages in the same group to ensure the same
// consumer will receive both
Message message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
message = ...
message.setStringProperty("JMSXGroupID", "Group-0");
producer.send(message);
或者,您可以在 ActiveMQConnectonFactory
上将 autogroup
设置为 true,它将选择一个随机的唯一 ID。这也可以在 JNDI 上下文环境中设置,例如 jndi.properties
。以下是一个使用默认情况下在上下文中可用的“ConnectionFactory”连接工厂的简单示例
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=tcp://127.0.0.1:61616?autoGroup=true
或者,您可以通过连接工厂设置组 ID。所有使用通过此连接工厂创建的生产者发送的消息都将在发送的所有消息上将 JMSXGroupID
设置为指定的值。这也可以在 JNDI 上下文环境中设置,例如 jndi.properties
。以下是一个使用默认情况下在上下文中可用的“ConnectionFactory”连接工厂的简单示例
java.naming.factory.initial=org.apache.activemq.artemis.jndi.ActiveMQInitialContextFactory
connectionFactory.myConnectionFactory=tcp://127.0.0.1:61616?groupID=Group-0
3. 关闭消息组
通常,您不需要关闭消息组,只需继续使用它即可。但是,如果您确实想关闭一个组,您可以添加一个负序列号。
示例
Message message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "Group-0");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);
这会关闭消息组,因此如果将来发送具有相同消息组 ID 的另一条消息,它将被重新分配给一个新的消费者。
4. 通知消费者组所有权变更
ActiveMQ 支持在发送到特定消息组的消费者的第一条消息上添加一个布尔标头。
要启用此功能,您必须设置一个标头键,代理将使用该键设置标志。
在示例中,我们使用 JMSXGroupFirstForConsumer
,但它可以是您想要的任何标头键值。
通过在队列级别将 group-first-key
设置为 JMSXGroupFirstForConsumer
,每次将新组分配给消费者时,JMSXGroupFirstForConsumer
标头将在第一条消息上设置为 true
。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-first-key="JMSXGroupFirstForConsumer"/>
</multicast>
</address>
或者,在使用核心 JMS 客户端时,在自动创建时,通过使用地址参数创建消费者使用的目标。
Queue queue = session.createQueue("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
Topic topic = session.createTopic("my.destination.name?group-first-key=JMSXGroupFirstForConsumer");
此外,可以使用 address-setting
配置为地址下所有队列的默认值设置默认值。
<address-setting match="my.address">
<default-group-first-key>JMSXGroupFirstForConsumer</default-group-first-key>
</address-setting>
默认情况下,此功能处于关闭状态。
5. 重新平衡消息组
有时,在添加新的消费者后,您会发现它们没有分配任何组,因此没有被使用。这是因为所有组都已分配给现有的消费者。但是,可以重新平衡组,以便队列上的所有消费者都分配到一个或多个组。
在重置的确切时刻,发送到最初关联消费者的消息可能正在传输中,同时向新关联的消费者分发了相同组的新消息。 |
5.1. 手动
通过调用关联队列上的 resetAllGroups
,使用管理 API(例如通过 Web 控制台)。
5.2. 自动
通过在队列级别将 group-rebalance
设置为 true
,每次添加消费者时,都会触发组的重新平衡/重置。
如上所述,在执行组重新平衡时,您可能会遇到正在处理中的消息。默认情况下,代理将在重新平衡期间继续分发。要确保在重新平衡后向不同消费者分发新消息之前处理正在处理中的消息,您可以将 group-rebalance-pause-dispatch
设置为 true
,这将导致分发暂停,直到所有正在处理中的消息都被处理完为止。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-rebalance="true" group-rebalance-pause-dispatch="true"/>
</multicast>
</address>
或者,在使用核心 JMS 客户端时,在自动创建时,通过使用地址参数创建消费者使用的目标。
Queue queue = session.createQueue("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
Topic topic = session.createTopic("my.destination.name?group-rebalance=true&group-rebalance-pause-dispatch=true");
此外,可以使用 address-setting
配置为地址下所有队列的默认值设置默认值。
<address-setting match="my.address">
<default-group-rebalance>true</default-group-rebalance>
<default-group-rebalance-pause-dispatch>true</default-group-rebalance-pause-dispatch>
</address-setting>
默认情况下,default-group-rebalance
为 false
,表示此功能已禁用/关闭。默认情况下,default-group-rebalance-pause-dispatch
为 false
,表示此功能已禁用/关闭。
6. 组桶
为了在具有有限内存的队列中处理组,并允许更好地扩展组,您可以启用组桶,本质上是将组 ID 散列到桶中,而不是跟踪每个组 ID。
将 group-buckets
设置为 -1
将保留默认行为,这意味着队列会跟踪每个组,但会遇到无界内存使用问题。
将 group-buckets
设置为 0
将在队列上禁用分组(0 个桶)。这在多播地址上可能很有用,在多播地址上存在许多队列,但您可能不关心一个队列的排序,并且希望保持循环行为。
有几种方法可以设置 group-buckets
。
<address name="foo.bar">
<multicast>
<queue name="orders1" group-buckets="1024"/>
</multicast>
</address>
通过使用 CORE API 创建队列,指定参数 group-buckets
为 20
。
或者,在使用 JMS 客户端时,在自动创建时,通过使用地址参数创建消费者使用的目标。
Queue queue = session.createQueue("my.destination.name?group-buckets=1024");
Topic topic = session.createTopic("my.destination.name?group-buckets=1024");
此外,可以使用 address-setting
配置为地址下所有队列的默认值设置默认值。
<address-setting match="my.bucket.address">
<default-group-buckets>1024</default-group-buckets>
</address-setting>
默认情况下,default-group-buckets
为 -1
,这是为了保持与现有默认行为的兼容性。
可以使用地址 通配符 为一组地址配置 group-buckets
。
8. 集群分组
在查看集群分组支持配置的详细信息之前,值得研究一下集群分组的整体概念。一般来说,将集群和消息分组结合在一起是一个糟糕的选择,因为分组消息(即有序消息)的基本概念和通过集群进行横向扩展的基本概念实际上是相互矛盾的。
消息分组强制执行有序消息消费。有序消息消费要求在消费组中的下一条消息之前,必须完全消费和确认每条消息。这会导致串行消息处理(即没有并发)。
但是,集群的概念是通过添加可以并发处理消息的消费者来横向扩展代理,以提高消息吞吐量。但是,由于消息组是有序的,因此每个组中的消息不能并发消费,这会破坏横向扩展的目的。
由于这些原因,不建议使用集群分组。
但是,如果您在考虑这些设计注意事项的情况下评估了您的整体用例,并确定集群分组仍然可行,那么请继续阅读以了解所有配置详细信息和最佳实践。
8.1. 集群分组配置
在集群中使用消息组要复杂一些。这是因为具有特定组 ID 的消息可能会到达任何节点,因此每个节点都需要知道哪些组 ID 绑定到哪个节点上的哪个消费者。处理特定组 ID 消息的消费者可能在集群的不同节点上,因此每个节点都需要知道此信息,以便它可以将消息正确地路由到具有该消费者的节点。
为了解决这个问题,引入了分组处理程序的概念。每个节点将有自己的分组处理程序,当发送具有分配的组 ID 的消息时,处理程序将决定应使用哪条路由。
以下是一些针对每种处理程序类型的配置示例。这应该在 broker.xml
中配置。
<grouping-handler name="my-grouping-handler">
<type>LOCAL</type>
<address>jms</address>
<timeout>5000</timeout>
</grouping-handler>
<grouping-handler name="my-grouping-handler">
<type>REMOTE</type>
<address>jms</address>
<timeout>5000</timeout>
</grouping-handler>
- 类型
-
支持两种类型的处理程序 -
LOCAL
和REMOTE
。每个集群应选择 1 个节点具有LOCAL
分组处理程序,所有其他节点应具有REMOTE
处理程序。是LOCAL
处理程序实际上决定了应使用哪条路由,所有其他REMOTE
处理程序都会与它进行通信。 - 地址
-
引用 集群连接及其使用的地址。请参阅集群部分了解如何配置集群。
- 超时
-
等待做出决定的时间。如果达到此超时时间,将在发送期间抛出异常,这确保了严格的排序。
消息路由的决定最初由接收消息的节点提出。该节点将根据正常的集群路由条件选择合适的路由,即轮询可用队列、优先使用本地队列,并选择具有消费者的队列。如果分组处理程序接受该提议,则该节点将从那时起将消息路由到该队列,如果拒绝,则将提供备用路由,该节点将再次无限期地路由到该队列。所有其他节点也将路由到提议时选择的队列。消息到达队列后,正常的单服务器消息组语义将生效,消息将被固定到该队列上的消费者。
您可能已经注意到,单一本地处理程序存在单点故障。如果该节点崩溃,则无法做出任何决定。任何发送的消息都不会被传递,并且会抛出异常。为了避免这种情况发生,可以在另一个备份节点上复制本地处理程序。只需创建您的备份节点,并使用相同的本地处理程序对其进行配置。
8.2. 集群分组最佳实践
使用集群分组时,应遵循一些最佳实践。
-
如果可能,请确保您的消费者均匀分布在不同的节点上。这只有在您定期创建和关闭消费者时才会成为问题。由于消息一旦固定就会始终路由到同一个队列,因此从该队列中删除消费者可能会导致该队列没有消费者,这意味着该队列将继续接收消息。避免关闭消费者,或者确保您始终拥有足够的消费者,例如,如果您有 3 个节点,则拥有 3 个消费者。
-
如果可能,请使用持久队列。如果在组绑定到队列后删除队列,则其他节点可能仍然尝试将消息路由到该队列。可以通过确保由发送消息的会话删除该队列来避免这种情况。这意味着当发送下一条消息时,该消息将被发送到删除该队列的节点,这意味着可以成功地进行新的提议。或者,您可以直接开始使用不同的组 ID。
-
始终确保具有本地分组处理程序的节点被复制。这意味着在故障转移时仍会发生分组。
-
如果您使用组超时,远程节点的组超时应该更短,至少是主协调器上值的一半。这是因为这将决定上次使用值在节点之间请求组的往返过程中更新的频率。
8.3. 集群分组示例
请参见 集群分组示例,该示例演示了如何在 ActiveMQ Artemis 集群中配置消息组。