Apache ActiveMQ Artemis 可以配置为对超过配置大小的消息进行特殊处理。 而不是将这些消息的整个内容存储在内存中,代理将在队列上保留一个轻量级对象,该对象引用内容(例如,在文件或数据库表中)。
这在核心协议和 AMQP 协议上都受支持。
1. 配置服务器
使用 文件日志 时,大型消息会存储在服务器上的磁盘上。 配置属性 large-messages-directory
指定大型消息的存储位置。
<configuration...>
<core...>
...
<large-messages-directory>data/large-messages</large-messages-directory>
...
</core>
</configuration>
默认情况下,large-messages-directory
为 data/largemessages
。
为了获得最佳性能,我们建议使用文件日志,并将大型消息目录放在与消息日志或分页目录不同的物理卷上。 |
对于 JDBC 持久性,应配置 large-message-table
。
<configuration...>
<core...>
...
<store>
<database-store>
...
<large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
...
</database-store>
</store>
...
</core>
</configuration>
默认情况下,large-message-table
为 LARGE_MESSAGE_TABLE
。
默认情况下,将最终字节写入大型消息时,所有写入都会同步到存储介质。 这可以通过 large-message-sync
进行配置,例如:
<configuration...>
<core...>
...
<large-message-sync>true</large-message-sync>
...
</core>
</configuration>
默认情况下,large-message-sync
为 true
。
2. 配置核心客户端
任何大于特定大小的消息都被视为大型消息。 大型消息将被拆分并以片段形式发送。 这是由 URL 参数 minLargeMessageSize
决定的。
Apache ActiveMQ Artemis 消息使用每字符 2 字节进行编码,因此如果消息数据填充了 ASCII 字符(为 1 字节),则生成的 Apache ActiveMQ Artemis 消息的大小将大致翻倍。 在计算“大型”消息的大小方面,这一点很重要,因为在发送之前它可能看起来小于 |
默认值为 100KiB。
直接从客户端配置传输 将提供有关如何实例化核心会话工厂或 JMS 连接工厂的更多信息。
3. 核心协议上的压缩大型消息
您可以选择使用 compressLargeMessage
URL 参数以压缩形式发送大型消息。
如果您将布尔 URL 参数 compressLargeMessage
指定为 true,系统将在将消息传输到服务器端时使用 ZIP 算法压缩消息体。 请注意,服务器端没有特殊处理,所有压缩和解压缩都在客户端完成。
可以通过设置一个可选参数来进一步调整此行为:compressionLevel
。 这将决定消息体应该压缩多少。 compressionLevel
接受一个 -1 的整数或 0-9 之间的数值。 默认值为 -1,对应于大约 6-7 级。
如果大型消息的压缩大小低于 minLargeMessageSize
,则将其作为常规消息发送到服务器。 这意味着消息不会写入服务器的大型消息数据目录,从而减少磁盘 I/O。
较高的 compressionLevel 意味着消息体将被进一步压缩,但这以速度和计算开销为代价。 确保根据特定用例调整此值。 |
4. 从核心协议流式传输大型消息
Apache ActiveMQ Artemis 支持使用输入和输出流(java.lang.io
)设置消息体。
这些流将直接用于发送(输入流)和接收(输出流)消息。
接收消息时,有两种方法可以处理输出流;您可以选择在使用 ClientMessage.saveOutputStream
方法恢复输出流时阻塞,或者选择使用 ClientMessage.setOutputstream
方法,该方法将异步将消息写入流。 如果您选择后者,则消费者必须保持活动状态,直到消息完全接收。
您可以使用任何类型的流。 最常见的用例是发送存储在磁盘上的文件,但您也可以发送诸如 JDBC Blob、SocketInputStream
之类的东西,这些东西是从 HTTPRequests
等恢复的。 只要它实现了 java.io.InputStream
用于发送消息或 java.io.OutputStream
用于接收消息,任何东西都可以。
4.1. 通过核心 API 流式传输
下表显示了在 ClientMessage
上可用的方法列表,这些方法也可以通过使用对象属性通过 JMS 使用。
名称 | 描述 | JMS 等效项 |
---|---|---|
setBodyInputStream(InputStream) |
设置用于在发送消息时读取消息体的 InputStream。 |
JMS_AMQ_InputStream |
setOutputStream(OutputStream) |
设置将接收消息体的 OutputStream。 此方法不会阻塞。 |
JMS_AMQ_OutputStream |
saveOutputStream(OutputStream) |
将消息体保存到 |
JMS_AMQ_SaveStream |
在接收核心消息时设置输出流
ClientMessage msg = consumer.receive(...);
// This will block here until the stream was transferred
msg.saveOutputStream(someOutputStream);
ClientMessage msg2 = consumer.receive(...);
// This will not wait the transfer to finish
msg2.setOutputStream(someOtherOutputStream);
在发送核心消息时设置输入流
ClientMessage msg = session.createMessage();
msg.setInputStream(dataInputStream);
还要注意,对于大小超过 2GiB 的消息,getBodySize() 将返回无效值,因为这是一个整数(它也被暴露给 JMS API)。 在这些情况下,您可以使用消息属性 _AMQ_LARGE_SIZE。
4.2. 通过 JMS 流式传输
使用 JMS 时,Apache ActiveMQ Artemis 通过设置对象属性来映射核心 API 上的流式传输方法(请参阅上面的 ClientMessage API 表)。 您可以使用 Message.setObjectProperty
方法设置输入和输出流。
InputStream
可以通过正在发送的消息上的 JMS 对象属性 JMS_AMQ_InputStream 进行定义。
BytesMessage message = session.createBytesMessage();
FileInputStream fileInputStream = new FileInputStream(fileInput);
BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);
someProducer.send(message);
OutputStream
可以通过以阻塞方式接收的消息上的 JMS 对象属性 JMS_AMQ_SaveStream 进行设置。
BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);
File outputFile = new File("huge_message_received.dat");
FileOutputStream fileOutputStream = new FileOutputStream(outputFile);
BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);
// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);
也可以使用属性 JMS_AMQ_OutputStream 以非阻塞方式设置 OutputStream
。
// This won't wait the stream to finish. You need to keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);
使用 JMS 时,仅 |
4.3. 核心协议上的流式传输备选方案
如果您选择不使用 Apache ActiveMQ Artemis 的 InputStream
或 OutputStream
功能,您仍然可以通过其他方式直接访问数据。
在核心 API 上,只需像往常一样获取主体字节即可。
ClientMessage msg = consumer.receive();
byte[] bytes = new byte[1024];
for (int i = 0 ; i < msg.getBodySize(); i += bytes.length)
{
msg.getBody().readBytes(bytes);
// Whatever you want to do with the bytes
}
如果使用 JMS API,BytesMessage
和 StreamMessage
也透明地支持它。
BytesMessage rm = (BytesMessage)cons.receive(10000);
byte data[] = new byte[1024];
for (int i = 0; i < rm.getBodyLength(); i += 1024)
{
int numberOfBytes = rm.readBytes(data);
// Do whatever you want with the data
}
5. 配置 AMQP 接收器
您可以在接收器上配置属性 amqpMinLargeMessageSize
。
默认值为 102400(100KBytes)。
将其设置为 -1 将禁用大型消息支持。
将 amqpMinLargeMessageSize 设置为 -1,您的 AMQP 消息可能会存储为核心大型消息,如果消息大小不适合日志。 这是代理的旧语义,出于兼容性原因,它被保留下来。 |
<acceptors>
<!-- AMQP Acceptor. Listens on default AMQP port for AMQP traffic.-->
<acceptor name="amqp">tcp://0.0.0.0:5672?; ..... amqpMinLargeMessageSize=102400; .... </acceptor>
</acceptors>