支持 IO 流
非常希望能够为通过 ActiveMQ Classic 传输任意大小的流文件提供强力的支持。基本思路是将流分成多个消息,并通过 JMS 发送/接收这些消息。
有一些问题需要考虑...
用例
- 多个生产者写入,只有一个消费者。理想情况下,每个消费者应该一次完整处理一个流(IO 流通常是阻塞 IO,因此当客户端线程正在读取一个流时,它不能处理其他流)。
- 如果消费者在处理 10GB 文件的 1GB 数据时崩溃,我们需要将消息重新传递给另一个消费者。
目标
我们的目标应该是
- 让每个消费者一次处理一个完整的流,然后再尝试处理其他流。
可能的故障条件
- 消费者可能在读取流的过程中崩溃。恢复选项
- 丢弃流的剩余部分
- 将流的传递重新开始到下一个消费者
- 从失败点继续将流传递给下一个消费者。
- 生产者可能在写入流的过程中崩溃。我们可能需要检测其故障。我们可以
- 在事务中发送流。只有在消息被代理完全接收后,流才会被发送给消费者。缺点:消费者在能够接收消息之前会经历高延迟。
- 消费者超时:如果消息没有及时接收,消费者会认为生产者已崩溃。(如果它没有崩溃呢?)
- 消费者可能在流的中间开始接收。这种情况可能发生
- 如果另一个消费者认为代理已崩溃(但实际上没有)。
- 如果非流消费者意外删除了消息,或者消息由于消费者回滚而被发送到 DLQ。
实现问题
- 我们可以使用消息组来确保同一个消费者处理给定流的所有消息 - 但不幸的是,消息组不能阻止消费者一次处理多个消息组 - 也许这是一个我们可以添加的新功能?
- 避免代理内存不足 - 因此,如果唯一的消费者正在处理另一个流,则将数据写入磁盘(或限制生产者)。
- 如果使用事务发送消息,则可能需要对我们进行事务管理和消息日志记录的方式进行重大更改。目前,正在进行的事务中的所有消息都会保存在内存中,直到提交为止(同步回调会保留这些消息)。日志当前会保留所有已提交消息的日志 + 内存,直到提交为止,它现在不允许日志回滚正在进行的事务中的一部分消息。
- 由于整个流必须一次处理,我们只能确认整个消息流 - 因此我们需要禁用预取?
- 不需要禁用预取,因为消费者会发送一种特殊类型的确认,该确认只会暂时扩展预取窗口。