生产者流量控制
生产者流量控制
在 ActiveMQ Classic 4.x 中,流量控制是使用 TCP 流量控制实现的。受限消费者的底层网络连接被暂停以执行流量控制限制。这种策略非常高效,但如果有多个生产者和消费者共享同一个连接,可能会导致死锁。
从 ActiveMQ Classic 5.0 开始,我们现在可以在共享连接上单独控制每个生产者的流量,而无需暂停整个连接。所谓“流量控制”是指,如果代理检测到目标的内存限制或代理的临时或文件存储限制已超出,则消息流速会被降低。生产者将被阻塞,直到资源可用或将收到 JMSException:此行为是可配置的,并在下面有关<systemUsage>
的部分中进行了说明。
值得注意的是,默认的<systemUsage>
设置会导致生产者在memoryLimit
或<systemUsage>
限制达到时阻塞:这种阻塞行为有时被误解为“生产者挂起”,而实际上生产者只是在勤勤恳恳地等待空间可用。
-
同步发送的消息将自动使用每个生产者的流量控制;这通常适用于同步发送的持久消息,除非你启用了
useAsyncSend
标志。 -
使用异步发送的生产者 - 通常来说,非持久消息的生产者 - 不用等待代理的任何确认;因此,如果内存限制已超出,你不会收到通知。如果你确实希望了解代理限制被超出的情况,则需要配置 ProducerWindowSize 连接选项,以便即使异步消息也能按生产者进行流量控制。
ActiveMQConnectionFactory connctionFactory = connctionFactory.setProducerWindowSize(1024000);
ProducerWindowSize 是生产者在等待代理确认已接受之前发送的消息之前,可以传输到代理的数据的最大字节数。
或者,如果你正在发送非持久消息(默认情况下以异步方式发送),并且希望在队列或主题的内存限制被突破时得到通知,那么你可以简单地将连接工厂配置为“alwaysSyncSend”。虽然这样做会更慢,但它将确保你的消息生产者立即被告知内存问题。
如果你愿意,可以通过在代理配置中将producerFlowControl
标志设置为 false 来禁用代理上特定 JMS 队列和主题的流量控制 - 例如:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="FOO.>" producerFlowControl="false"/>
</policyEntries>
</policyMap>
</destinationPolicy>
参见代理配置.
请注意,自从 ActiveMQ Classic 5.x 中引入新的文件游标后,非持久消息被转移到临时文件存储中,以减少用于非持久消息的内存使用量。因此,你可能会发现队列的 memoryLimit 从未达到,因为游标没有占用太多内存。如果你真的想将所有非持久消息都保存在内存中,并在达到限制时停止生产者,则应该配置<vmQueueCursor>
。
<policyEntry queue=">" producerFlowControl="true" memoryLimit="1mb">
<pendingQueuePolicy>
<vmQueueCursor/>
</pendingQueuePolicy>
</policyEntry>
上面的片段将确保所有非持久队列消息都保存在内存中,每个队列的限制为 1Mb。
生产者流量控制的工作原理
如果你正在发送持久消息(因此预计会收到OpenWire 消息的响应),那么代理将向生产者发送ProducerAck 消息。这将通知生产者先前发送的窗口已处理,因此它现在可以发送另一个窗口。有点像消费者的确认,但反过来。
优势
因此,一个好的生产者可能会在发送更多数据之前等待生产者的确认,以避免淹没代理(并迫使代理在出现慢速消费者时阻塞整个连接)。要查看如何在源代码中实现这一点,请查看ActiveMQMessageProducer 代码。
虽然客户端可以完全忽略生产者的确认,代理应该在必须处理慢速消费者时暂停传输;虽然这确实意味着它会暂停整个连接。
配置客户端异常
当代理上没有可用空间时,send()
操作的无限期阻塞的另一种方法是在客户端配置抛出异常。通过将sendFailIfNoSpace
属性配置为true
,代理将导致send()
操作失败,并出现javax.jms.ResourceAllocationException
,该异常将传播到客户端。以下是此配置的示例
<systemUsage>
<systemUsage sendFailIfNoSpace="true">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
此属性的优点是客户端可以捕获javax.jms.ResourceAllocationException
,等待一段时间,然后重试send()
操作,而不是无限期地挂起。
从 5.3.1 版本开始,添加了sendFailIfNoSpaceAfterTimeout
属性。此属性会导致send()
操作在客户端失败,但仅在等待给定的时间量后才会失败。如果在配置的时间量后代理上仍然没有释放空间,只有send()
操作才会在客户端失败并出现异常。以下是一个示例
<systemUsage>
<systemUsage sendFailIfNoSpaceAfterTimeout="3000">
<memoryUsage>
<memoryUsage limit="20 mb"/>
</memoryUsage>
</systemUsage>
</systemUsage>
超时以毫秒为单位定义,因此上面的示例在向客户端抛出异常导致send()
操作失败之前会等待三秒钟。此属性的优点是它会阻塞配置的时间量,而不是立即失败或无限期地阻塞。此属性不仅对代理方进行了改进,而且对客户端也进行了改进,因此它可以捕获异常,等待一段时间,然后重试send()
操作。
从 5.16.0 版本开始,可以通过目标策略在每个目标的基础上配置sendFailIfNoSpace
和sendFailIfNoSpaceAfterTimeout
。
禁用流量控制
一个常见的需求是禁用流量控制,以便消息分发持续进行,直到所有可用的磁盘都被挂起的消息(无论是持久消息还是非持久消息)占用。为此,请启用消息游标.
系统使用情况
你也可以通过<systemUsage>
元素上的某些属性来降低生产者的速度。请查看以下示例
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage limit="64 mb" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb" />
</storeUsage>
<tempUsage>
<tempUsage limit="10 gb" />
</tempUsage>
</systemUsage>
</systemUsage>
你可以为NON_PERSISTENT
消息设置内存限制,为PERSITENT
消息设置磁盘存储限制,为临时消息设置总使用量限制,代理将在降低生产者的速度之前使用这些限制。使用上面显示的默认设置,代理将阻塞send()
调用,直到某些消息被消费并且代理上有空间可用为止。默认值如上所示,你可能需要为你的环境增加这些值。
百分比使用率
StoreUsage 和 TempUsage 都支持一个 percentLimit 属性,其中限制被确定为可用总量的百分比。从 5.15.x 版本开始,还有一个相关的 total 属性,可以用来明确设置可用总量,以便不会查询文件系统。这在以下情况下很有用:只有磁盘分区的一部分可用于代理,或者底层文件存储报告了超过 Long.MAX_VALUE 的可用容量(例如:EFS),这将导致 java.io.File#getTotalSpace 返回的 long 值溢出。请注意,当指定了总量时,不会针对文件系统再次验证实际的可用数据,而只是相对于该绝对总量的存储使用情况。