1. 示例
我们在 Artemis 示例 中提供了一些示例
-
.NET
-
./examples/protocols/amqp/dotnet
-
-
ProtonCPP
-
./examples/protocols/amqp/proton-cpp
-
./examples/protocols/amqp/proton-clustered-cpp
-
-
Ruby
-
./examples/protocols/amqp/proton-ruby
-
-
Java(使用 qpid JMS 客户端)
-
./examples/protocols/amqp/queue
-
-
拦截器
-
./examples/features/standard/interceptor-amqp
-
./examples/features/standard/broker-plugin
-
2. 消息转换
在发送 AMQP 和接收 AMQP 时,代理不会对任何其他协议执行任何消息转换。
但是,如果您打算将消息发送到 AMQP JMS 客户端,则必须遵循 JMS 映射约定。如果您发送此规范无法识别的正文类型,则 AMQP 和任何其他协议之间的转换将使其成为二进制消息。如果您打算跨协议或跨语言,请确保您遵循这些约定。特别是消息正文。
一个兼容性设置允许将 AMQP 队列的命名约定(JMS 持久订阅和共享订阅)与 CORE 对齐。出于向后兼容性的原因,您需要通过代理配置显式启用此设置
- amqp-use-core-subscription-naming
-
-
true
- 使用与 CORE 对齐的队列命名约定。 -
false
(默认) - 使用旧的命名约定。
-
3. 拦截和更改消息
我们不建议出于以下几个原因在服务器端更改消息
-
AMQP 消息应该是不变的
-
消息将不再是用户发送的原始消息
-
AMQP 具有签名消息的可能性。签名会被破坏。
-
出于性能原因。我们尽量不重新编码(甚至解码)消息。
如果您仍然需要和想要拦截并更改 AMQP 消息,请参考前面提到的拦截器示例。
4. AMQP 和安全
Apache ActiveMQ Artemis 服务器接受 PLAIN、ANONYMOUS 和 GSSAPI SASL 机制。这些机制在代理的 安全 基础设施上实现。
5. AMQP 和目标
如果 AMQP 链接是动态的,则会创建一个临时队列,并且远程源或远程目标地址将设置为临时队列的名称。如果链接不是动态的,则将使用远程目标或源的地址作为队列。如果该地址不存在,则如果设置允许,将自动创建它。
6. AMQP 和组播地址(主题)
虽然 AMQP 没有“主题”的概念,但仍然可以将 AMQP 消费者或接收器视为订阅,而不仅仅是队列上的消费者。默认情况下,任何连接到仅启用了 multicast
的地址的接收链接都将被视为订阅,并且将创建一个相应的订阅队列。如果 Terminus 持久性为 UNSETTLED_STATE
或 CONFIGURATION
,则队列将被设置为持久(类似于 JMS 持久订阅),并使用容器 ID 和链接名称组合而成的名称,例如 my-container-id:my-link-name
。如果 Terminus 持久性配置为 NONE
,则将创建一个易失性的 multicast
队列。
7. AMQP 和协调 - 处理事务
AMQP 链接目标也可以是协调器。协调器用于处理事务。如果使用协调器,则底层服务器会话将被交易,并且将通过协调器回滚或提交。
AMQP 允许每个会话使用多个事务, |
8. AMQP 消息调度
AMQP 消息可以提供调度信息,该信息控制消息在未来最早可以被传递的时间。此信息是通过向发送的消息添加消息注释来提供的。
可以使用两种不同的消息注释来调度消息以延迟传递
- x-opt-delivery-time
-
指定的值必须是正长整数,对应于消息应该可用于传递的时间(以毫秒为单位)。
- x-opt-delivery-delay
-
指定的值必须是正长整数,对应于代理在将消息设置为可用于传递之前,应等待的毫秒数。
如果同一消息中同时存在这两个注释,则代理将优先选择更具体的 x-opt-delivery-time
值。
9. DLQ 和过期传输
AMQP 消息将在传输到 DLQ 或过期队列之前被复制,并在该过程中会接收属性和注释。
代理还保留一个仅限内部的属性(称为额外属性),这些属性不会公开给客户端,这些属性也会在此过程中填充。
以下是 AMQP 消息在传输时将接收到的注释和属性名称列表
注释名称 | 内部属性名称 | 描述 |
---|---|---|
|
|
传输前的原始消息 ID |
|
|
过期发生时的时间。自纪元时间以来的毫秒数 |
|
|
传输前的原始队列名称 |
|
|
传输前的原始地址名称 |
10. 按消息注释过滤
如果您在注释名称之前使用前缀“m.”,则可以按消息注释进行过滤。
例如,如果您想过滤发送到特定目标的消息,您可以根据此创建相应的过滤器
ConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:5672");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
javax.jms.Queue queue = session.createQueue("my-DLQ");
MessageConsumer consumer = session.createConsumer(queue, "\"m.x-opt-ORIG-ADDRESS\"='ORIGINAL_PLACE'");
Message message = consumer.receive();
代理将设置内部属性。如果您打算在 DLQ 或过期后进行过滤,您可以选择内部属性名称
// Replace the consumer creation on the previous example:
MessageConsumer consumer = session.createConsumer(queue, "_AMQ_ORIG_ADDRESS='ORIGINAL_PLACE'");
11. 配置 AMQP 空闲超时
可以通过在接受器上以毫秒为单位设置属性 amqpIdleTimeout 来配置 AMQP 服务器的空闲超时。
这将使服务器向客户端发送 AMQP 帧打开,其中包含您配置的超时的 1/2。
因此,如果您将 AMQP 空闲超时配置为 60000,则服务器将告知客户端每 30000 毫秒发送一次帧。
<acceptor name="amqp">.... ;amqpIdleTimeout=<configured-timeout>; ..... </acceptor>
11.1. 禁用保持活动检查
如果您设置 amqpIdleTimeout=0,这将告知客户端不要向服务器发送保持活动数据包。在这种情况下,您将依赖 TCP 来确定何时需要关闭套接字。
<acceptor name="amqp">.... ;amqpIdleTimeout=0; ..... </acceptor>
这包含一个配置 amqpIdleTimeout 的真实示例
<acceptor name="amqp">tcp://0.0.0.0:5672?amqpIdleTimeout=0;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;directDeliver=false;batchDelay=10</acceptor>
12. WebSockets
Apache ActiveMQ Artemis 还支持通过 WebSockets 使用 AMQP。支持 WebSockets 的现代 Web 浏览器可以发送和接收 AMQP 消息。
通过普通 AMQP 接受器支持通过 WebSockets 使用 AMQP
<acceptor name="amqp-ws-acceptor">tcp://127.0.0.1:5672?protocols=AMQP</acceptor>
使用此配置,Apache ActiveMQ Artemis 将在端口 5672
上通过 WebSockets 接受 AMQP 连接。然后,Web 浏览器可以使用 WebSockets 连接到 ws://<server>:5672
,以发送和接收 AMQP 消息。