慢速消费者处理
慢速消费者 会在非持久主题上造成问题,因为它们会导致代理将旧消息保留在 RAM 中,一旦 RAM 填满,就会迫使代理减慢生产者速度,导致快速消费者也被减速。我们将来可以实现的一个选项是将消息写入磁盘 - 但将消息写入磁盘也会减慢快速消费者的速度。
目前,我们有一种策略,可以让您配置代理为消费者保留的匹配消息的最大数量,除了其预取缓冲区之外。一旦达到此最大值,随着新消息的到来,旧消息将被丢弃。这允许您将 RAM 用于当前消息,并继续向慢速消费者发送消息,但丢弃旧消息。
待处理消息限制策略
您可以在目标映射上配置 PendingMessageLimitStrategy
实现类,以便您的主题命名空间的不同区域可以拥有不同的策略来处理慢速消费者。例如,您可能希望对价格使用这种策略,因为价格的流量非常大,但对于订单和交易,由于流量较低,您可能不希望丢弃旧消息。
该策略计算为消费者保留在 RAM 中的待处理消息的最大数量(超过其预取大小)。值为零表示除了预取数量之外不保留任何消息。大于零的值将保留多达该数量的消息,并随着新消息的到来丢弃旧消息。值为 -1
将禁用消息丢弃。
目前有两种不同的策略实现。
ConstantPendingMessageLimitStrategy
PrefetchRatePendingMessageLimitStrategy
ConstantPendingMessageLimitStrategy
该策略对所有消费者使用一个恒定的限制(超过其预取大小)。
示例
<constantPendingMessageLimitStrategy limit="50"/>
PrefetchRatePendingMessageLimitStrategy
该策略使用消费者的预取大小的倍数来计算待处理消息的最大数量。因此,您可以例如为每个消费者保留大约 2.5 倍的预取计数。
<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
使用预取策略配置限制
JMS 客户端有一个 预取策略,您可以使用它来配置持久和非持久队列和主题的各种预取限制。预取策略还允许您在每个连接/消费者基础上指定 maximumPendingMessageLimit
。配置此值时有一个细微的区别;为了简化与非 JMS 客户端(如 OpenWire)的操作,零值将被忽略;因此,您可以配置的最低值为 1
。
配置驱逐策略
我们有一个 MessageEvictionStrategy
用于确定在慢速消费者上应该驱逐哪条消息。默认实现是
<oldestMessageEvictionStrategy/>
但是,您可以编写自己的策略来使用某些特定于应用程序的方法选择要驱逐的消息。例如,如果您正在发送市场数据价格更新,您可能希望找到一个较旧的价格值,这可能不是最旧的消息。
示例
<uniquePropertyMessageEvictionStrategy propertyName="STOCK"/>
其中 propertyName
是指定价格的 JMS 消息属性。
另一个选择可能是使用具有最低优先级的最旧消息。因此,如果您有一些高优先级消息,则即使它们比较新,也要首先驱逐较低优先级的消息。
<oldestMessageWithLowestPriorityEvictionStrategy/>
示例
以下示例显示了 ActiveMQ Classic 代理配置文件。请注意,对于 PRICES.>
通配符范围内的主题,pendingMessageLimitStrategy
属性被设置为仅为每个消费者保留大约 10
条消息,超过其预取缓冲区大小。
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="https://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
https://activemq.apache.org/schema/core
https://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker xmlns="https://activemq.apache.org/schema/core" persistent="false" brokerName="${brokername}">
<!-- lets define the dispatch policy -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>">
<dispatchPolicy>
<roundRobinDispatchPolicy/>
</dispatchPolicy>
<subscriptionRecoveryPolicy>
<lastImageSubscriptionRecoveryPolicy/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="ORDERS.>">
<dispatchPolicy>
<strictOrderDispatchPolicy/>
</dispatchPolicy>
<!-- 1 minutes worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="60000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry topic="PRICES.>">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>
</pendingMessageLimitStrategy>
<!-- 10 seconds worth -->
<subscriptionRecoveryPolicy>
<timedSubscriptionRecoveryPolicy recoverDuration="10000"/>
</subscriptionRecoveryPolicy>
</policyEntry>
<policyEntry tempTopic="true" advisoryForConsumed="true"/>
<policyEntry tempQueue="true" advisoryForConsumed="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>
使用技巧
建议您,如果您知道某个特定消费者将很慢,那么将其预取大小设置为比快速消费者的小!
例如,如果您知道某个特定服务器速度很慢,并且您有非常高的消息速率,并且您还有一些非常快速的消费者,那么您可能希望启用此功能并将慢速服务器上的预取设置为比快速服务器上略低。
监控慢速消费者的状态
您还可以使用 JMX 控制台来查看活动订阅的统计信息。这允许您在 TopicSubscriptionViewMBean
上查看以下统计信息。
统计信息 | 定义 |
---|---|
discarded |
由于消费者速度慢而导致订阅在生命周期内丢弃的消息数量。 |
matched |
当前与订阅匹配的消息数量,将在预取缓冲区中有一些容量后立即分发给订阅。因此,非零值表示该订阅的预取缓冲区已满。 |