Apache ActiveMQ Artemis 可以配置为对超过配置大小的消息进行特殊处理。 而不是将这些消息的整个内容存储在内存中,代理将在队列上保留一个轻量级对象,该对象引用内容(例如,在文件或数据库表中)。

这在核心协议和 AMQP 协议上都受支持。

1. 配置服务器

使用 文件日志 时,大型消息会存储在服务器上的磁盘上。 配置属性 large-messages-directory 指定大型消息的存储位置。

<configuration...>
   <core...>
      ...
      <large-messages-directory>data/large-messages</large-messages-directory>
      ...
   </core>
</configuration>

默认情况下,large-messages-directorydata/largemessages

为了获得最佳性能,我们建议使用文件日志,并将大型消息目录放在与消息日志或分页目录不同的物理卷上。

对于 JDBC 持久性,应配置 large-message-table

<configuration...>
   <core...>
      ...
      <store>
         <database-store>
            ...
            <large-message-table-name>LARGE_MESSAGES_TABLE</large-message-table-name>
            ...
         </database-store>
      </store>
      ...
   </core>
</configuration>

默认情况下,large-message-tableLARGE_MESSAGE_TABLE

默认情况下,将最终字节写入大型消息时,所有写入都会同步到存储介质。 这可以通过 large-message-sync 进行配置,例如:

<configuration...>
   <core...>
      ...
      <large-message-sync>true</large-message-sync>
      ...
   </core>
</configuration>

默认情况下,large-message-synctrue

2. 配置核心客户端

任何大于特定大小的消息都被视为大型消息。 大型消息将被拆分并以片段形式发送。 这是由 URL 参数 minLargeMessageSize 决定的。

Apache ActiveMQ Artemis 消息使用每字符 2 字节进行编码,因此如果消息数据填充了 ASCII 字符(为 1 字节),则生成的 Apache ActiveMQ Artemis 消息的大小将大致翻倍。 在计算“大型”消息的大小方面,这一点很重要,因为在发送之前它可能看起来小于 minLargeMessageSize,但一旦它被编码,它就会变成“大型”消息。

默认值为 100KiB。

直接从客户端配置传输 将提供有关如何实例化核心会话工厂或 JMS 连接工厂的更多信息。

3. 核心协议上的压缩大型消息

您可以选择使用 compressLargeMessage URL 参数以压缩形式发送大型消息。

如果您将布尔 URL 参数 compressLargeMessage 指定为 true,系统将在将消息传输到服务器端时使用 ZIP 算法压缩消息体。 请注意,服务器端没有特殊处理,所有压缩和解压缩都在客户端完成。

可以通过设置一个可选参数来进一步调整此行为:compressionLevel。 这将决定消息体应该压缩多少。 compressionLevel 接受一个 -1 的整数或 0-9 之间的数值。 默认值为 -1,对应于大约 6-7 级。

如果大型消息的压缩大小低于 minLargeMessageSize,则将其作为常规消息发送到服务器。 这意味着消息不会写入服务器的大型消息数据目录,从而减少磁盘 I/O。

较高的 compressionLevel 意味着消息体将被进一步压缩,但这以速度和计算开销为代价。 确保根据特定用例调整此值。

4. 从核心协议流式传输大型消息

Apache ActiveMQ Artemis 支持使用输入和输出流(java.lang.io)设置消息体。

这些流将直接用于发送(输入流)和接收(输出流)消息。

接收消息时,有两种方法可以处理输出流;您可以选择在使用 ClientMessage.saveOutputStream 方法恢复输出流时阻塞,或者选择使用 ClientMessage.setOutputstream 方法,该方法将异步将消息写入流。 如果您选择后者,则消费者必须保持活动状态,直到消息完全接收。

您可以使用任何类型的流。 最常见的用例是发送存储在磁盘上的文件,但您也可以发送诸如 JDBC Blob、SocketInputStream 之类的东西,这些东西是从 HTTPRequests 等恢复的。 只要它实现了 java.io.InputStream 用于发送消息或 java.io.OutputStream 用于接收消息,任何东西都可以。

4.1. 通过核心 API 流式传输

下表显示了在 ClientMessage 上可用的方法列表,这些方法也可以通过使用对象属性通过 JMS 使用。

名称 描述 JMS 等效项

setBodyInputStream(InputStream)

设置用于在发送消息时读取消息体的 InputStream。

JMS_AMQ_InputStream

setOutputStream(OutputStream)

设置将接收消息体的 OutputStream。 此方法不会阻塞。

JMS_AMQ_OutputStream

saveOutputStream(OutputStream)

将消息体保存到 OutputStream。 它将阻塞,直到整个内容传输到 OutputStream

JMS_AMQ_SaveStream

在接收核心消息时设置输出流

ClientMessage msg = consumer.receive(...);

// This will block here until the stream was transferred
msg.saveOutputStream(someOutputStream);

ClientMessage msg2 = consumer.receive(...);

// This will not wait the transfer to finish
msg2.setOutputStream(someOtherOutputStream);

在发送核心消息时设置输入流

ClientMessage msg = session.createMessage();
msg.setInputStream(dataInputStream);

还要注意,对于大小超过 2GiB 的消息,getBodySize() 将返回无效值,因为这是一个整数(它也被暴露给 JMS API)。 在这些情况下,您可以使用消息属性 _AMQ_LARGE_SIZE。

4.2. 通过 JMS 流式传输

使用 JMS 时,Apache ActiveMQ Artemis 通过设置对象属性来映射核心 API 上的流式传输方法(请参阅上面的 ClientMessage API 表)。 您可以使用 Message.setObjectProperty 方法设置输入和输出流。

InputStream 可以通过正在发送的消息上的 JMS 对象属性 JMS_AMQ_InputStream 进行定义。

BytesMessage message = session.createBytesMessage();

FileInputStream fileInputStream = new FileInputStream(fileInput);

BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);

message.setObjectProperty("JMS_AMQ_InputStream", bufferedInput);

someProducer.send(message);

OutputStream 可以通过以阻塞方式接收的消息上的 JMS 对象属性 JMS_AMQ_SaveStream 进行设置。

BytesMessage messageReceived = (BytesMessage)messageConsumer.receive(120000);

File outputFile = new File("huge_message_received.dat");

FileOutputStream fileOutputStream = new FileOutputStream(outputFile);

BufferedOutputStream bufferedOutput = new BufferedOutputStream(fileOutputStream);

// This will block until the entire content is saved on disk
messageReceived.setObjectProperty("JMS_AMQ_SaveStream", bufferedOutput);

也可以使用属性 JMS_AMQ_OutputStream 以非阻塞方式设置 OutputStream

// This won't wait the stream to finish. You need to keep the consumer active.
messageReceived.setObjectProperty("JMS_AMQ_OutputStream", bufferedOutput);

使用 JMS 时,仅 StreamMessageBytesMessage 支持流式传输大型消息。

4.3. 核心协议上的流式传输备选方案

如果您选择不使用 Apache ActiveMQ Artemis 的 InputStreamOutputStream 功能,您仍然可以通过其他方式直接访问数据。

在核心 API 上,只需像往常一样获取主体字节即可。

ClientMessage msg = consumer.receive();

byte[] bytes = new byte[1024];
for (int i = 0 ;  i < msg.getBodySize(); i += bytes.length)
{
   msg.getBody().readBytes(bytes);
   // Whatever you want to do with the bytes
}

如果使用 JMS API,BytesMessageStreamMessage 也透明地支持它。

BytesMessage rm = (BytesMessage)cons.receive(10000);

byte data[] = new byte[1024];

for (int i = 0; i < rm.getBodyLength(); i += 1024)
{
   int numberOfBytes = rm.readBytes(data);
   // Do whatever you want with the data
}

5. 配置 AMQP 接收器

您可以在接收器上配置属性 amqpMinLargeMessageSize

默认值为 102400(100KBytes)。

将其设置为 -1 将禁用大型消息支持。

将 amqpMinLargeMessageSize 设置为 -1,您的 AMQP 消息可能会存储为核心大型消息,如果消息大小不适合日志。 这是代理的旧语义,出于兼容性原因,它被保留下来。
<acceptors>
      <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
      <acceptor name="amqp">tcp://0.0.0.0:5672?;   ..... amqpMinLargeMessageSize=102400; .... </acceptor>
</acceptors>

6. 大型消息示例

请参阅 大型消息示例,其中显示了大型消息如何配置和使用 JMS。