消息重新传递和 DLQ 处理
开发者 > 开发者指南 > 设计文档 > 消息重新传递和 DLQ 处理
概述
当出现以下任何情况时,消息将重新传递给客户端
- 使用事务会话,并调用
rollback()
。 - 在调用
commit()
之前关闭事务会话。 - 会话使用
CLIENT_ACKNOWLEDGE
,并调用Session.recover()
。 - 客户端连接超时(可能是正在执行的代码超过了配置的超时时间)。
代理在其 BrokerInfo
命令数据包中向客户端连接传输其首选的默认传递策略。但客户端可以使用 ActiveMQConnection.getRedeliveryPolicy()
方法覆盖策略设置。
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);
一旦消息的重新传递尝试次数超过了为 重新传递策略 配置的 maximumRedeliveries
,将向代理发送“Poison ACK”以告知代理该消息被视为毒丸。然后,代理将获取该消息并将其发送到死信队列,以便以后进行分析。
ActiveMQ Classic 中的默认死信队列称为 ActiveMQ.DLQ
;所有无法传递的消息都将被发送到此队列,这可能难以管理。因此,您可以在 activemq.xml
配置文件中的目标策略映射中设置 individualDeadLetterStrategy
,这允许您为给定的队列或主题指定特定的死信队列前缀。如果您愿意,可以使用通配符应用此策略,以便所有队列都拥有自己的死信队列,如下面的示例所示。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
有关策略选项的更多详细信息,请参见 重新传递策略 部分。
自动丢弃过期消息
有些人只需要丢弃过期消息,而不是将其发送到 DLQ,即完全跳过 DLQ。这简化了 DLQ 的管理,因此您无需筛选大量过期消息来查找存在实际问题的消息。要告诉 ActiveMQ Classic 只丢弃过期消息,请在死信策略上将 processExpired
属性配置为 false。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<!--
Tell the dead letter strategy not to process expired messages
so that they will just be discarded instead of being sent to
the DLQ
-->
<deadLetterStrategy>
<sharedDeadLetterStrategy processExpired="false" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
将非持久性消息放入死信队列
默认情况下,ActiveMQ Classic 不会将无法传递的非持久性消息放入死信队列。这种行为背后的原因是,如果应用程序不太关心使消息持久化,那么记录该消息无法传递几乎没有价值。如果您确实想要将非持久性消息放入死信队列,那么您应该在死信策略上设置 processNonPersistent="true"
。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<!--
Tell the dead letter strategy to also place non-persisted messages
onto the dead-letter queue if they can't be delivered.
-->
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent="true" />
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
在 DLQ 中设置消息的过期时间
默认情况下,ActiveMQ Classic 永远不会使发送到 DLQ 的消息过期。但是,从 ActiveMQ Classic 5.12 开始, deadLetterStrategy
支持一个 expiration
属性,其值为毫秒。
选择性地应用此属性。特别是,不要通过在默认策略或包含通配符的策略条目上设置过期时间来将过期时间应用于您的 DLQ 目标。
如果 DLQ 条目过期并转发到具有过期时间的相同或其他 DLQ,则将引入一个循环,如果策略审计被禁用或滑动窗口被超出,则该循环可能会出现问题。
<broker>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="QueueWhereItIsOkToExpireDLQEntries">
<deadLetterStrategy>
<.... expiration="300000"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
消息审计
死信策略具有一个默认情况下启用的消息审计。这可以防止将重复消息添加到配置的 DLQ 中。从 5.15.0 开始,可以通过以下方式配置审计的限制:
maxProducersToAudit
和 maxAuditDepth
属性。可以使用 enableAudit="false"
禁用审计。
丢弃 DLQ 插件
从 ActiveMQ Classic 5.9 开始 - 目标
policyEntry
支持丢弃的deadLetterStrategy
<deadLetterStrategy> <discarding/> </deadLetterStrategy>
这与插件的作用相同,但仅针对每个目标。基于插件的正则表达式匹配比目标匹配更强大,因此插件在某些情况下可能仍然有用。
一个非常简单但非常有用的代理插件。此插件允许配置队列和主题,全部或基于 Java SE 正则表达式 匹配,以丢弃已发送到 DLQ 的消息。当使用 常数待处理消息限制策略 或其他驱逐规则时,这非常有用,但您不想承担另一个消费者来清除 DLQ 的开销。
以下是一个丢弃所有内容的基本配置示例
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true"/>
</plugins>
</broker>
</beans>
以下是一个稍微更复杂的示例
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.29 MY.EXAMPLE.QUEUE.87" reportInterval="1000"/>
</plugins>
</broker>
</beans>
- 请注意,目标名称以空格分隔。
-
reportInterval
属性用于表示我们输出已丢弃消息数量的频率 - 使用0
禁用。
以下是一个更复杂的示例
<beans>
<broker>
<plugins>
<discardingDLQBrokerPlugin dropOnly="MY.EXAMPLE.TOPIC.[0-9]{3} MY.EXAMPLE.QUEUE.[0-9]{3}" reportInterval="3000"/>
</plugins>
</broker>
</beans>
- 请注意,目标名称使用正则表达式。这些匹配每个目标名称末尾的数字
000..999
。
有关更多信息,请参见 DiscardingDLQBrokerPlugin 和 DiscardingDLQBroker 的源代码。
代理重新传递 (v5.7)
通常,消费者会处理重新传递,以便在消息在代理上显示为正在传输时保持消息顺序。这意味着重新传递仅限于单个消费者,除非该消费者终止。这样,代理就不知道重新传递。使用代理重新传递,代理可以通过使用重新发送在延迟后重新传递消息。这是通过一个代理插件实现的,该插件通过调度程序处理通过重新传递进行的死信处理。当总消息顺序不重要,并且消费者之间的吞吐量和负载分配很重要时,这很有用。使用代理重新传递,无法传递给给定消费者的消息可以立即重新分派。
该功能通过以下 XML 配置启用
<broker schedulerSupport="true">
<plugins>
<redeliveryPlugin fallbackToDeadLetter="true"
sendToDlqIfMaxRetriesExceeded="true">
<redeliveryPolicyMap>
<redeliveryPolicyMap>
<redeliveryPolicyEntries>
<!-- a destination specific policy -->
<redeliveryPolicy queue="SpecialQueue"
maximumRedeliveries="4"
redeliveryDelay="10000"/>
</redeliveryPolicyEntries>
<defaultEntry>
<!-- the fallback policy for all other destinations -->
<redeliveryPolicy maximumRedeliveries="4"
initialRedeliveryDelay="5000"
redeliveryDelay="10000"/>
</defaultEntry>
</redeliveryPolicyMap>
</redeliveryPolicyMap>
</redeliveryPlugin>
</plugins>
</broker>
熟悉的 重新传递策略 已扩展为接受匹配的目标。 fallbackToDeadLetter
控制当没有匹配目标的重新传递策略时的操作。默认为 true
,因此将进行常规 DLQ 处理。 sendToDlqIfMaxRetriesExceeded
控制当重试限制超过时执行的操作。默认为 true,因此将进行常规 DLQ 处理。当 false
时,该消息将被丢弃。
ActiveMQ Classic 的
schedulerSupport
必须启用此功能才能正常工作。