消息组
消息组
消息组是对 独占消费者 功能的增强。它们提供
- 保证跨单个队列处理相关消息的顺序。
- 跨多个消费者处理消息的负载均衡。
- 高可用性 / 如果 JVM 崩溃,则自动故障转移到其他消费者。
因此,从逻辑上讲,消息组就像一个并行的 独占消费者。与其将所有消息发送到单个消费者,不如使用标准 JMS 标头 JMSXGroupID
来定义消息属于哪个 消息组。然后,消息组功能确保所有属于 相同 消息组的消息都将发送到 相同 的 JMS 消费者 - 在该消费者保持活动状态的同时。一旦消费者死亡,就会选择另一个消费者。
另一种解释消息组的方法是,它提供了跨消费者进行粘性负载均衡的消息;其中 JMSXGroupID
类似于 HTTP 会话 ID 或 Cookie 值,而消息代理则充当 HTTP 负载均衡器。
用例示例
假设我们正在做某种订单匹配系统,人们在其中买卖东西(股票、股份、在线投注等)。你想要拥有匹配不同商品(股票 / 投注)的竞价和报价的消费者,因此他们想要为了性能而在 RAM 中保留数据的子集。因此,将 JMSXGroupID
设置为 MSFT
、IBM
、SUNW
等等,使用股票代码来定义消息组。(它可以是任何字符串;可以将交易簿、交易所、日期等组合在一起 - 组 ID 越具体,并发运行的次数就越多)。假设我们正在买卖 MSFT
、IBM
、SUNW
股票;消息组功能保证所有 MSFT
消息将按顺序由同一个消费者处理;对于 IBM
和 SUNW
也是如此。
消息组的工作原理
当消息被分发到消费者时,会检查 JMSXGroupID
。如果存在,则代理会检查是否有消费者拥有该消息组。由于可能存在大量消息组,因此使用哈希桶而不是实际的 JMSXGroupID
字符串。
如果没有消费者与消息组关联,则会选择一个消费者。该 JMS MessageConsumer
将接收具有相同 JMSXGroupID
值的所有后续消息,直到
- 消费者关闭(或创建消费者的客户端死亡等)。
- 有人通过发送
JMSXGroupSeq
为负值的邮件关闭消息组(有关更多详细信息,请参见下文)。
注意:与消息选择器匹配一样,基于 JMSXGroupID
的分组发生在内存中消息分发之前。使用默认的 maxPageSize
选项,如果目标一个组的大量消息积压无法全部容纳在内存中,则可能会阻止接收发送到其他组的消息。您可以更改目标的默认 maxPageSize
设置,如下所示
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" maxPageSize="1000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
使用消息组
你只需要更改 JMS 生产者,以便用你选择的任何 String
值填充 JMSXGroupID
消息头。
示例
Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
...
producer.send(message);
关闭消息组
你通常不需要关闭消息组;只需继续使用它。但是,如果你确实想要关闭组,可以添加一个负序列号。
示例
Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);
这将 关闭 消息组,因此如果将来使用相同的消息组 ID 发送另一条消息,它将被重新分配给一个新的消费者。
影响
消息组意味着你可以获得跨消费者集群进行消息 网格 处理的强大功能,包括可靠性、自动故障转移、负载均衡,但你还可以对消息处理进行排序。因此,它兼具两者的优点。但是,使用上面的示例,消息组实际上所做的事情是使用用户定义的划分策略(JMSXGroupID
值)跨消费者划分工作负载。
最棒的是,你可以做一些很酷的事情,例如使用大量的 RAM 缓存;在 MSFT
消费者中保留 MSFT
的 RAM 中的顺序;在 IBM
消费者中保留 IBM
的 RAM 中的顺序 - 由于消息代理为你进行了划分,你无需依赖于具有缓存间同步和锁定的分布式缓存来利用缓存。
最棒的是,对于应用程序开发人员来说,它看起来像一个简单的 1 个消费者世界,你可以在其中处理消息并完成工作;让代理为你完成所有困难的事情
- 划分流量
- 消息组跨消费者的负载均衡
- 随着消费者来来去去,组的自动故障转移到不同的消费者
总之,如果排序或每条消息的缓存和同步对你很重要,我们强烈建议你使用消息组来划分流量。
获取有关消息组所有权更改的通知
ActiveMQ Classic 支持一个名为 JMSXGroupFirstForConsumer
的布尔标头。此标头在发送给消费者以用于特定消息组的第一条消息上设置。
如果 JMS 连接使用 failover:
并且发生临时网络错误,导致连接断开与代理的连接并在一段时间后重新连接,则 JMS 客户端的幕后会创建一个新的消费者实例,从而可能导致对相同消息组设置具有此标头的另一条消息。
示例
String groupId = message.getStringProperty("JMSXGroupId");
if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
// flush cache for groupId
}
刷新缓存以确保在遇到网络错误时保持一致状态。
添加新消费者
如果你在代理中存在现有消息,并且在稍后阶段添加了消费者,最好延迟消息分发启动,直到所有消费者都存在(或者至少留出足够的时间让他们订阅)。如果你没有这样做,第一个消费者可能会获取所有消息组,所有消息都将分发给它。你可以通过使用 consumersBeforeDispatchStarts
和 timeBeforeDispatchStarts
目标策略 来实现这一点。
当 consumersBeforeDispatchStarts 和 timeBeforeDispatchStarts 都设置为大于零的值时,分发将在所需数量的消费者出现或 timeBeforeDispatchStarts 超时到期后立即开始。如果仅设置 consumersBeforeDispatchStarts,则消费者连接的超时时间为 1 秒。如果所有消费者都断开连接,则消息分发延迟将在下一个消费者连接时再次应用。
以下是对延迟 200ms
分发的目标策略的示例
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" timeBeforeDispatchStarts="200"/>
</policyEntries>
</policyMap>
</destinationPolicy>
以下代码片段显示了如何在分发开始之前等待两个消费者(或两秒钟)
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
</policyEntries>
</policyMap>
</destinationPolicy>
正如 相应的测试用例 所示,在分发之前添加一个小的暂停时间或设置最小消费者数量,可以确保消息组的均匀分布。
如果你需要出于任何原因手动重新平衡消息组,可以通过对相应队列的 JMX MBean 执行 removeAllMessageGroups
操作来实现。
内存消耗、负载均衡、复杂性等的竞争需求。
名为 CachedMessageGroupMap
的默认行为仅限于 LRU 缓存中的 1024 个消息组,可能与你对消息顺序的预期不符。 CachedMessageGroupMap
的内存使用量有限,但仅跟踪最多 1024 个(或配置的最大大小)组,然后会丢失对任何比最新 1024 个更旧的组的跟踪。这样,如果组数超过最大值,最旧组的顺序将丢失。
通常,用户会关闭组,以便可以将内存中的集合保留在配置的限制之下。在 AMQ-6851 中有一些有用的讨论。
为了防止这种限制,可以使用 MessageGroupHashBucket
或 SimpleMessageGroupMap
。它们的工作方式是将每个组与一个消费者关联。
SimpleMessageGroupMap
会跟踪每个组,但内存使用量不受限制。
以下代码片段显示了如何启用它
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<messageGroupMapFactory>
<simpleMessageGroupMapFactory/>
</messageGroupMapFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
MessageGroupHashBucked
会跟踪每个组,并且内存使用量有限。以下代码片段显示了如何启用它
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<messageGroupMapFactory>
<messageGroupHashBucked cachedSize=1024 />
</messageGroupMapFactory>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>