慢速消费者
阻塞传输
当使用 TCP 时,网络故障可能会导致写操作阻塞。这会导致整个代理冻结,并且套接字可能永远不会解除阻塞。目前,我们有一个线程用于检查阻塞的套接字,使用扫描协议检测是否在超过可配置时间段内存在阻塞写入的连接。这可以工作,但无法解除与套接字发送关联的调用线程的阻塞(在非持久主题中将是发布线程)。
需要检查关闭套接字是否会解除发送阻塞。
TCP 传输还使用 InactivityMontor 类作为 TransportFilter,该类通过在空闲时定期通过传输强制 KeepAliveInfo 命令来检测死套接字。因此,InactivityMontor 可以假设如果定期没有接收数据包,则表示传输已死,并生成传输异常。
阻塞消费者
这与上面略有不同 - 消费者在处理消息时被阻塞或非常缓慢。在客户端,连接可以保存的消息数量受预取限制(对于非持久主题,预取数量为数千)。
慢速消费者背景
慢速消费者可能会导致代理出现问题。以下是我们可以采取的一些措施。
通常,慢速消费者不会对队列造成太大影响,因为消费者会争夺消息;因此,慢速消费者只是比其他消费者获得的消息更少。
非持久主题
非持久主题是最受慢速消费者影响的场景,因为消息不是持久的,并且消息通常会发送给所有消费者(具有有效选择器的消费者)。
以下是一些我们可以采取的措施
- 阻塞/减慢生产者
- 删除慢速消费者
- 将消息写入磁盘
- 丢弃慢速消费者的消息
这些将作为用户可插拔策略公开。可能值得按目的地逐个进行操作?
持久主题
我们可以从 RAM 中删除消息,因为它们是持久的,因此我们可以很好地处理慢速消费者(假设你有足够的磁盘空间)。
如果消费者落后太多,我们可以考虑杀死消费者;但我认为这更像是后台操作的问题?
持久队列
由于所有消息都是持久的,因此可以从内存中逐出。
非持久队列
慢速消费者对于队列来说并不是真正的问题。但所有消费者都变慢则是一个问题。在这种情况下,我们最终会阻塞生产者,直到消息被消费。
其他选择可能是
- 将消息写入磁盘
- 丢弃消息
实现方案
对于持久消息:引入一种不同的调度模型,其中每个目的地都有一个线程,并且有自己的内存分配。这将使我们能够更好地控制调度,并允许我们为不同的目的地设置不同的优先级。
对于非持久消息 - 在生产者代理线程和写入消费者套接字之间引入一个可选的间接级别非常重要。这将允许我们插入写入磁盘、丢弃消息并杀死阻塞的套接字,而不会影响代理中的任何其他连接。
我们当前的默认行为是阻塞生产者,直到慢速消费者赶上(对于这里的非持久主题)。
另一个可行的选择是,如果消费者被标记为慢速消费者,那么我们可以丢弃传递给它的消息,直到它不再是慢速消费者。如果我们有一种方法可以将订阅对象标记为缓慢,那么这应该很容易实现。
随着时间的推移,将引入更多高级变体。这可能包括
- 仅根据慢速消费者的百分比激活慢速消费者策略 - 例如,如果所有消费者都很慢,你可能希望阻塞发布者,但如果只有一两个消费者很慢,你可能希望采取一些措施
- 关闭慢速消费者
- 将有限数量的消息写入磁盘
- 自定义丢弃策略 - 你可能希望根据预设模式或选择器丢弃消息
- 以上方案的组合……
慢速消费者检测器
我们需要一个好的方法来检测消费者是否很慢 - 以及了解消费者何时再次加速。