消息组

功能 > 消费者功能 > 消息组

消息组

消息组是对 独占消费者 功能的增强。它们提供

  • 保证跨单个队列处理相关消息的顺序。
  • 跨多个消费者处理消息的负载均衡。
  • 高可用性 / 如果 JVM 崩溃,则自动故障转移到其他消费者。

因此,从逻辑上讲,消息组就像一个并行的 独占消费者。与其将所有消息发送到单个消费者,不如使用标准 JMS 标头 JMSXGroupID 来定义消息属于哪个 消息组。然后,消息组功能确保所有属于 相同 消息组的消息都将发送到 相同 的 JMS 消费者 - 在该消费者保持活动状态的同时。一旦消费者死亡,就会选择另一个消费者。

另一种解释消息组的方法是,它提供了跨消费者进行粘性负载均衡的消息;其中 JMSXGroupID 类似于 HTTP 会话 ID 或 Cookie 值,而消息代理则充当 HTTP 负载均衡器。

用例示例

假设我们正在做某种订单匹配系统,人们在其中买卖东西(股票、股份、在线投注等)。你想要拥有匹配不同商品(股票 / 投注)的竞价和报价的消费者,因此他们想要为了性能而在 RAM 中保留数据的子集。因此,将 JMSXGroupID 设置为 MSFTIBMSUNW 等等,使用股票代码来定义消息组。(它可以是任何字符串;可以将交易簿、交易所、日期等组合在一起 - 组 ID 越具体,并发运行的次数就越多)。假设我们正在买卖 MSFTIBMSUNW 股票;消息组功能保证所有 MSFT 消息将按顺序由同一个消费者处理;对于 IBMSUNW 也是如此。

消息组的工作原理

当消息被分发到消费者时,会检查 JMSXGroupID。如果存在,则代理会检查是否有消费者拥有该消息组。由于可能存在大量消息组,因此使用哈希桶而不是实际的 JMSXGroupID 字符串。

如果没有消费者与消息组关联,则会选择一个消费者。该 JMS MessageConsumer 将接收具有相同 JMSXGroupID 值的所有后续消息,直到

  • 消费者关闭(或创建消费者的客户端死亡等)。
  • 有人通过发送 JMSXGroupSeq 为负值的邮件关闭消息组(有关更多详细信息,请参见下文)。

注意:与消息选择器匹配一样,基于 JMSXGroupID 的分组发生在内存中消息分发之前。使用默认的 maxPageSize 选项,如果目标一个组的大量消息积压无法全部容纳在内存中,则可能会阻止接收发送到其他组的消息。您可以更改目标的默认 maxPageSize 设置,如下所示

<destinationPolicy>
    <policyMap>
       <policyEntries>
           <policyEntry queue=">" maxPageSize="1000"/>
       </policyEntries>
    </policyMap>
</destinationPolicy>

使用消息组

你只需要更改 JMS 生产者,以便用你选择的任何 String 值填充 JMSXGroupID 消息头。

示例

Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
...
producer.send(message);

关闭消息组

你通常不需要关闭消息组;只需继续使用它。但是,如果你确实想要关闭组,可以添加一个负序列号。

示例

Mesasge message = session.createTextMessage("<foo>hey</foo>");
message.setStringProperty("JMSXGroupID", "IBM\_NASDAQ\_20/4/05");
message.setIntProperty("JMSXGroupSeq", -1);
...
producer.send(message);

这将 关闭 消息组,因此如果将来使用相同的消息组 ID 发送另一条消息,它将被重新分配给一个新的消费者。

影响

消息组意味着你可以获得跨消费者集群进行消息 网格 处理的强大功能,包括可靠性、自动故障转移、负载均衡,但你还可以对消息处理进行排序。因此,它兼具两者的优点。但是,使用上面的示例,消息组实际上所做的事情是使用用户定义的划分策略(JMSXGroupID 值)跨消费者划分工作负载。

最棒的是,你可以做一些很酷的事情,例如使用大量的 RAM 缓存;在 MSFT 消费者中保留 MSFT 的 RAM 中的顺序;在 IBM 消费者中保留 IBM 的 RAM 中的顺序 - 由于消息代理为你进行了划分,你无需依赖于具有缓存间同步和锁定的分布式缓存来利用缓存。

最棒的是,对于应用程序开发人员来说,它看起来像一个简单的 1 个消费者世界,你可以在其中处理消息并完成工作;让代理为你完成所有困难的事情

  • 划分流量
  • 消息组跨消费者的负载均衡
  • 随着消费者来来去去,组的自动故障转移到不同的消费者

总之,如果排序或每条消息的缓存和同步对你很重要,我们强烈建议你使用消息组来划分流量。

获取有关消息组所有权更改的通知

ActiveMQ Classic 支持一个名为 JMSXGroupFirstForConsumer 的布尔标头。此标头在发送给消费者以用于特定消息组的第一条消息上设置。

如果 JMS 连接使用 failover: 并且发生临时网络错误,导致连接断开与代理的连接并在一段时间后重新连接,则 JMS 客户端的幕后会创建一个新的消费者实例,从而可能导致对相同消息组设置具有此标头的另一条消息。

示例

String groupId = message.getStringProperty("JMSXGroupId");

if (message.getBooleanProperty("JMSXGroupFirstForConsumer")) {
   // flush cache for groupId
}

刷新缓存以确保在遇到网络错误时保持一致状态。

添加新消费者

如果你在代理中存在现有消息,并且在稍后阶段添加了消费者,最好延迟消息分发启动,直到所有消费者都存在(或者至少留出足够的时间让他们订阅)。如果你没有这样做,第一个消费者可能会获取所有消息组,所有消息都将分发给它。你可以通过使用 consumersBeforeDispatchStartstimeBeforeDispatchStarts 目标策略 来实现这一点。

consumersBeforeDispatchStartstimeBeforeDispatchStarts 都设置为大于零的值时,分发将在所需数量的消费者出现或 timeBeforeDispatchStarts 超时到期后立即开始。如果仅设置 consumersBeforeDispatchStarts,则消费者连接的超时时间为 1 秒。如果所有消费者都断开连接,则消息分发延迟将在下一个消费者连接时再次应用。

以下是对延迟 200ms 分发的目标策略的示例

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" timeBeforeDispatchStarts="200"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

以下代码片段显示了如何在分发开始之前等待两个消费者(或两秒钟)

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="2000"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>

正如 相应的测试用例 所示,在分发之前添加一个小的暂停时间或设置最小消费者数量,可以确保消息组的均匀分布。

如果你需要出于任何原因手动重新平衡消息组,可以通过对相应队列的 JMX MBean 执行 removeAllMessageGroups 操作来实现。

内存消耗、负载均衡、复杂性等的竞争需求。

名为 CachedMessageGroupMap 的默认行为仅限于 LRU 缓存中的 1024 个消息组,可能与你对消息顺序的预期不符。 CachedMessageGroupMap 的内存使用量有限,但仅跟踪最多 1024 个(或配置的最大大小)组,然后会丢失对任何比最新 1024 个更旧的组的跟踪。这样,如果组数超过最大值,最旧组的顺序将丢失

通常,用户会关闭组,以便可以将内存中的集合保留在配置的限制之下。在 AMQ-6851 中有一些有用的讨论。

为了防止这种限制,可以使用 MessageGroupHashBucketSimpleMessageGroupMap。它们的工作方式是将每个组与一个消费者关联。

SimpleMessageGroupMap 会跟踪每个组,但内存使用量不受限制。

以下代码片段显示了如何启用它

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">">
          <messageGroupMapFactory>
            <simpleMessageGroupMapFactory/>
          </messageGroupMapFactory>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

MessageGroupHashBucked 会跟踪每个组,并且内存使用量有限。以下代码片段显示了如何启用它

<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry queue=">">
          <messageGroupMapFactory>
            <messageGroupHashBucked cachedSize=1024 />
          </messageGroupMapFactory>
      </policyEntry>
    </policyEntries>
  </policyMap>
</destinationPolicy>

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛徽标和 Apache ActiveMQ 项目徽标是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据 Apache 许可证 2.0 授权。