Apache ActiveMQ Artemis 支持 AMQP 1.0 规范。默认情况下,配置了 acceptor 元素,用于在端口 616165672 上接受 AMQP 连接。

有关配置 AMQP 的 acceptor 的详细信息,请参阅常规的 协议和互操作性 章节。

您可以使用任何与 AMQP 1.0 兼容的客户端。

简短列表包括

1. 示例

我们在 Artemis 示例 中提供了一些示例

  • .NET

    • ./examples/protocols/amqp/dotnet

  • ProtonCPP

    • ./examples/protocols/amqp/proton-cpp

    • ./examples/protocols/amqp/proton-clustered-cpp

  • Ruby

    • ./examples/protocols/amqp/proton-ruby

  • Java(使用 qpid JMS 客户端)

    • ./examples/protocols/amqp/queue

  • 拦截器

    • ./examples/features/standard/interceptor-amqp

    • ./examples/features/standard/broker-plugin

2. 消息转换

在发送 AMQP 和接收 AMQP 时,代理不会对任何其他协议执行任何消息转换。

但是,如果您打算将消息发送到 AMQP JMS 客户端,则必须遵循 JMS 映射约定。如果您发送此规范无法识别的正文类型,则 AMQP 和任何其他协议之间的转换将使其成为二进制消息。如果您打算跨协议或跨语言,请确保您遵循这些约定。特别是消息正文。

一个兼容性设置允许将 AMQP 队列的命名约定(JMS 持久订阅和共享订阅)与 CORE 对齐。出于向后兼容性的原因,您需要通过代理配置显式启用此设置

amqp-use-core-subscription-naming
  • true - 使用与 CORE 对齐的队列命名约定。

  • false(默认) - 使用旧的命名约定。

3. 拦截和更改消息

我们不建议出于以下几个原因在服务器端更改消息

  • AMQP 消息应该是不变的

  • 消息将不再是用户发送的原始消息

  • AMQP 具有签名消息的可能性。签名会被破坏。

  • 出于性能原因。我们尽量不重新编码(甚至解码)消息。

如果您仍然需要和想要拦截并更改 AMQP 消息,请参考前面提到的拦截器示例。

4. AMQP 和安全

Apache ActiveMQ Artemis 服务器接受 PLAIN、ANONYMOUS 和 GSSAPI SASL 机制。这些机制在代理的 安全 基础设施上实现。

5. AMQP 和目标

如果 AMQP 链接是动态的,则会创建一个临时队列,并且远程源或远程目标地址将设置为临时队列的名称。如果链接不是动态的,则将使用远程目标或源的地址作为队列。如果该地址不存在,则如果设置允许,将自动创建它。

6. AMQP 和组播地址(主题)

虽然 AMQP 没有“主题”的概念,但仍然可以将 AMQP 消费者或接收器视为订阅,而不仅仅是队列上的消费者。默认情况下,任何连接到仅启用了 multicast 的地址的接收链接都将被视为订阅,并且将创建一个相应的订阅队列。如果 Terminus 持久性为 UNSETTLED_STATECONFIGURATION,则队列将被设置为持久(类似于 JMS 持久订阅),并使用容器 ID 和链接名称组合而成的名称,例如 my-container-id:my-link-name。如果 Terminus 持久性配置为 NONE,则将创建一个易失性的 multicast 队列。

7. AMQP 和协调 - 处理事务

AMQP 链接目标也可以是协调器。协调器用于处理事务。如果使用协调器,则底层服务器会话将被交易,并且将通过协调器回滚或提交。

AMQP 允许每个会话使用多个事务,amqp:multi-txns-per-ssn,但在 Apache ActiveMQ Artemis 的此版本中,将只支持每个会话一个事务。

8. AMQP 消息调度

AMQP 消息可以提供调度信息,该信息控制消息在未来最早可以被传递的时间。此信息是通过向发送的消息添加消息注释来提供的。

可以使用两种不同的消息注释来调度消息以延迟传递

x-opt-delivery-time

指定的值必须是正长整数,对应于消息应该可用于传递的时间(以毫秒为单位)。

x-opt-delivery-delay

指定的值必须是正长整数,对应于代理在将消息设置为可用于传递之前,应等待的毫秒数。

如果同一消息中同时存在这两个注释,则代理将优先选择更具体的 x-opt-delivery-time 值。

9. DLQ 和过期传输

AMQP 消息将在传输到 DLQ 或过期队列之前被复制,并在该过程中会接收属性和注释。

代理还保留一个仅限内部的属性(称为额外属性),这些属性不会公开给客户端,这些属性也会在此过程中填充。

以下是 AMQP 消息在传输时将接收到的注释和属性名称列表

注释名称 内部属性名称 描述

x-opt-ORIG-MESSAGE-ID

_AMQ_ORIG_MESSAGE_ID

传输前的原始消息 ID

x-opt-ACTUAL-EXPIRY

_AMQ_ACTUAL_EXPIRY

过期发生时的时间。自纪元时间以来的毫秒数

x-opt-ORIG-QUEUE

_AMQ_ORIG_QUEUE

传输前的原始队列名称

x-opt-ORIG-ADDRESS

_AMQ_ORIG_ADDRESS

传输前的原始地址名称

10. 按消息注释过滤

如果您在注释名称之前使用前缀“m.”,则可以按消息注释进行过滤。

例如,如果您想过滤发送到特定目标的消息,您可以根据此创建相应的过滤器

ConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
javax.jms.Queue queue = session.createQueue("my-DLQ");
MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
Message message = consumer.receive();

代理将设置内部属性。如果您打算在 DLQ 或过期后进行过滤,您可以选择内部属性名称

// Replace the consumer creation on the previous example:
MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");

11. 配置 AMQP 空闲超时

可以通过在接受器上以毫秒为单位设置属性 amqpIdleTimeout 来配置 AMQP 服务器的空闲超时。

这将使服务器向客户端发送 AMQP 帧打开,其中包含您配置的超时的 1/2。

因此,如果您将 AMQP 空闲超时配置为 60000,则服务器将告知客户端每 30000 毫秒发送一次帧。

<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>

11.1. 禁用保持活动检查

如果您设置 amqpIdleTimeout=0,这将告知客户端不要向服务器发送保持活动数据包。在这种情况下,您将依赖 TCP 来确定何时需要关闭套接字。

<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>

这包含一个配置 amqpIdleTimeout 的真实示例

<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;directDeliver=false;batchDelay=10</acceptor>

12. WebSockets

Apache ActiveMQ Artemis 还支持通过 WebSockets 使用 AMQP。支持 WebSockets 的现代 Web 浏览器可以发送和接收 AMQP 消息。

通过普通 AMQP 接受器支持通过 WebSockets 使用 AMQP

<acceptor name="amqp-ws-acceptor">tcp://127.0.0.1:5672?protocols=AMQP</acceptor>

使用此配置,Apache ActiveMQ Artemis 将在端口 5672 上通过 WebSockets 接受 AMQP 连接。然后,Web 浏览器可以使用 WebSockets 连接到 ws://<server>:5672,以发送和接收 AMQP 消息。