调度策略

功能 > 消息调度功能 > 调度策略

调度策略

队列的调度策略

可插拔的调度策略仅适用于主题。对于队列,调度更加静态,您可以选择轮询(默认)或严格顺序。在讨论调度策略之前,先了解一下预取值的目的

ActiveMQ Classic 的开箱即用配置专为高性能和高吞吐量消息传递而设计,在这些消息传递中,需要尽快将大量消息调度到消费者。因此,默认的预取值相当大,默认的调度策略将尝试尽快填充预取缓冲区。

但是,在消息传递中,有许多用例,有时默认配置并不适合您的用例;当您发送少量消息时,它们往往都发送给一个消费者,除非您有大量消息。如果您有大量消费者,并且有一个相对较高的预取值,并且您有一些消息需要相当长的时间来处理,那么默认的调度策略可能会导致处理所有消息所需的时间增加(因为对于少量消息来说,负载均衡并不公平)。

对于队列,您可以定义调度是轮询方式进行(默认行为),还是在某个消费者的预取缓冲区耗尽之前调度过程才会选择下一个消费者(strictOrderDispatch)。

后一种行为可以通过在<policyEntry />元素上设置strictOrderDispatch属性来启用。例如:

<policyEntry queue=">" strictOrderDispatch="false" />

观察消费者优先级,因此,如果您有几个优先级不同的消费者,那么优先级最高的消费者将首先被填充,直到它无法再接收消息,然后是下一个消费者,依此类推。

从 5.14.0 版本开始,strictOrderDispatch=true 选项将确保在单个消费者的情况下对重新调度消息进行严格排序。

主题的调度策略

主题有更多选项,因为调度策略是可插拔的。任何实现 org.apache.activemq.broker.region.policy.DispatchPolicy 的类都可以使用。

默认的org.apache.activemq.broker.region.policy.SimpleDispatchPolicy 会像预期的那样工作,将消息传递给所有订阅者。更高级的实现示例是org.apache.activemq.broker.region.policy.PriorityNetworkDispatchPolicy,它只调度到优先级最高的网络消费者。这在环状网络拓扑中很有用,在这种拓扑中,存在不止一条路由到消费者。

这是一个目标策略配置示例

<destinationPolicy>
   <policyMap>
      <policyEntries>
         <policyEntry topic="FOO.>">
            <dispatchPolicy>
               <roundRobinDispatchPolicy />
            </dispatchPolicy>
            <subscriptionRecoveryPolicy>
               <lastImageSubscriptionRecoveryPolicy />
            </subscriptionRecoveryPolicy>
         </policyEntry>
         <policyEntry topic="ORDERS.>">
            <dispatchPolicy>
               <strictOrderDispatchPolicy />
            </dispatchPolicy>
            <!-- 1 minutes worth -->
            <subscriptionRecoveryPolicy>
               <timedSubscriptionRecoveryPolicy recoverDuration="60000" /> 
            </subscriptionRecoveryPolicy>
         </policyEntry>
         <policyEntry topic="PRICES.>">
            <!-- lets force old messages to be discarded for slow consumers --> 
            <pendingMessageLimitStrategy>
               <constantPendingMessageLimitStrategy limit="10"/>
            </pendingMessageLimitStrategy>
            <!-- 10 seconds worth -->
            <subscriptionRecoveryPolicy>
               <timedSubscriptionRecoveryPolicy recoverDuration="10000" />
            </subscriptionRecoveryPolicy>
         </policyEntry>
         <policyEntry tempTopic="true" advisoryForConsumed="true" />
         <policyEntry tempQueue="true" advisoryForConsumed="true" />
      </policyEntries>
   </policyMap>
</destinationPolicy>

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛标识和 Apache ActiveMQ 项目标识是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据Apache 许可证 2.0许可。