Stomp
ActiveMQ Classic 支持 Stomp 协议和 Stomp - JMS 映射。这使得用纯 Ruby、Perl、Python 或 PHP 为 ActiveMQ Classic 编写客户端变得很容易。
更多详细信息请参见 Stomp 网站
规范符合性
ActiveMQ Classic v5.6 实现 Stomp v1.1 规范,除了允许在消息头键的开头或结尾处使用空格,它们在头值中保留。在将来的版本中,情况将不再如此,客户端应进行更新,用户代码应经过检查以确保头中的空格是故意存在的,而不是意外的或客户端“功能”。
启用 ActiveMQ Classic Broker 以用于 Stomp
要在代理中启用 STOMP 协议支持,请添加一个传输连接器定义,其 URI 方案为 stomp
。
示例
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613"/>
</transportConnectors>
要查看完整的示例,请尝试 此 XML。如果将该 XML 保存为 foo.xml
,则可以通过命令行运行 stomp,如下所示
activemq xbean:foo.xml
更多帮助,请参见 运行代理。
Stomp 线路格式
Stomp 使用基于文本的线路格式,该格式可以使用以下选项配置。所有选项都可以在代理的传输绑定 URI 上配置。
参数名称 | 默认值 | 描述 |
---|---|---|
maxDataLength |
104857600 |
可以发送的消息体(内容)的最大大小。 |
maxFrameSize |
MAX_LONG |
从 ActiveMQ Classic 5.12.0 开始:可以发送的最大帧大小。Stomp 帧包括一个命令、可选头和一个可选体。可以帮助防止 OOM DOS 攻击 |
示例
<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612?wireFormat.maxFrameSize=1000000"/>
使用正确的前缀!
线路格式选项必须具有前缀
wireFormat.
才能生效,例如wireFormat.`maxDataLength`=100000
。缺少此前缀的选项将被忽略。
安全
从 ActiveMQ Classic 5.1 开始:Stomp 全面支持 ActiveMQ Classic 的安全 机制。这意味着 CONNECT
命令将在身份验证失败时返回一个 ERROR
STOMP 帧。此外,当你尝试访问(读取/写入)某些目标时,将应用授权策略。如果你使用同步操作(通过使用 回执),则可以在未授权访问尝试的情况下,预期会出现一个 ERROR
帧。在其他情况下,操作将被丢弃,但客户端不会收到错误通知。这适用于可以在代理端发生的任何错误。
SSL
为了获得额外的安全性,你可以使用以下部分中描述的 Stomp over SSL。
启用 Stomp over NIO
从 ActiveMQ Classic 5.3 开始:为了获得更好的可扩展性和性能,可以将 Stomp 协议配置为在 NIO 传输上运行。与相应的 TCP 连接器相比,NIO 传输 将使用更少的线程。当需要支持 大量队列 时,这将有所帮助。要使用 NIO,请将传输连接器的 URI 方案更改为 stomp+nio
。
示例
<transportConnector name="stomp+nio" uri="stomp+nio://127.0.0.1:61612"/>
启用 Stomp over SSL
要将 ActiveMQ Classic 配置为使用 Stomp over SSL 连接,请将 URI 方案更改为 stomp+ssl
。
示例
<transportConnector name="stomp+ssl" uri="stomp+ssl://127.0.0.1:61612"/>
有关在 ActiveMQ Classic 中使用 SSL 的更多详细信息,请参见以下文章 (如何使用 SSL)。可以在 PHP Stomp 客户端示例 中找到在客户端使用 Stomp over SSL 的示例。
心跳宽限期
STOMP 协议(版本 1.1 或更高版本)定义了心跳的概念,作为一种方法,客户端和代理可以通过该方法来确定它们之间底层 TCP 连接的健康状况。ActiveMQ Classic 支持 STOMP 心跳,前提是客户端使用协议的 1.1 版本(或更高版本)。
在 ActiveMQ Classic 5.9.0 之前:‘读取’心跳超时(即从客户端发送到代理的心跳)的强制执行是严格的。换句话说,代理对来自客户端的延迟到达的读取心跳不包容。这导致代理得出结论,客户端不再存在,当客户端未能遵守其配置的心跳设置时,导致代理关闭其对客户端连接的连接。
从 ActiveMQ Classic 5.9.0 开始:读取心跳的超时强制执行现在可以通过一个新的传输选项 transport.hbGracePeriodMultiplier
进行配置
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613?transport.hbGracePeriodMultiplier=1.5"/>
</transportConnectors>
此乘数用于计算代理将对每个客户端连接强制执行的有效读取心跳超时。乘数应用于客户端在其 CONNECT
帧中指定的读取超时间隔
<client specified read heart-beat interval> * <grace periodmultiplier> == <broker enforced read heart-beat timeout interval>
为了向后兼容,如果未配置宽限期乘数,则默认的强制执行模式将保持严格,例如 transport.hbGracePeriodMultiplier=1.0
。尝试将宽限期乘数配置为小于或等于 1.0
的值将被静默忽略。
希望对来自代理的延迟到达的心跳具有容忍性的 STOMP 客户端必须实现自己的解决方案来实现这一点。
-
有关心跳的详细信息,请查看 STOMP 规范
-
实现此功能的 JIRA:ActiveMQ Classic 5.x 不支持 STOMP 协议支持的心跳宽限期的概念
使用 Stomp 与目标进行交互
请注意,stomp 中的前缀 /queue/
或 /topic/
在传递给 ActiveMQ Classic 作为 JMS 目标之前将从字符串中删除。还要注意,MOM 系统中的默认分隔符是 .
(点)。虽然 FOO.BAR
是标识队列类型目标的正常语法,但 Stomp 等效项是 /queue/FOO.BAR
注意目标是否以
/
开头如果在 Stomp 世界中你使用
/queue/foo/bar
,那么在 JMS 世界中,队列将被称为foo/bar
,而不是/foo/bar
。
STOMP 中的持久消息
STOMP 消息默认情况下是非持久的。要使用持久消息,请将以下 STOMP 头添加到所有
SEND
请求:persistent:true
。此默认设置与 JMS 消息的默认设置相反。
使用 JMS 文本/字节消息和 Stomp
Stomp 是一种非常简单的协议,这也是它的优势所在!因此,它不了解 JMS 消息,例如 TextMessage
或 BytesMessage
。但是,该协议确实支持 content-length
头。为了在 STOMP 和 JMS 客户端之间提供更强大的交互,ActiveMQ Classic 会依赖于此头的包含,以确定在从 Stomp 发送到 JMS 时要创建的邮件类型。逻辑很简单
包含内容长度头 | 生成的邮件 |
---|---|
是 | BytesMessage |
否 | TextMessage |
从 JMS 到 Stomp 时也可以遵循相同的逻辑。可以编写一个 Stomp 客户端来依赖于 content-length
头的包含,以确定要向用户提供哪种类型的邮件结构。
邮件转换
SEND
和 SUBSCRIBE
邮件上的 transformation
邮件头可用于指示 ActiveMQ Classic 将邮件从文本转换为你想要的格式。目前,ActiveMQ Classic 附带一个转换器,可以将 XML/JSON 文本转换为 Java 对象,但你也可以添加自己的转换器。
以下是如何使用内置转换器(取自测试用例)的简要示例
private String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n"
+ "</pojo>";
public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:jms-object-xml\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
依赖项
ActiveMQ Classic 使用 XStream 来满足其转换需求。由于它是可选的依赖项,因此你必须将其添加到代理的类路径中,方法是将相应的 JAR 放入
lib/
文件夹中。此外,如果你计划使用 JSON 转换,则必须将 Jettison JSON 解析器添加到类路径中。
为了创建自己的转换器,你需要执行以下操作
-
通过实现 FrameTranslator 接口来构建你的转换器
-
通过创建以你想要在 JAR 的
META-INF/services/org/apache/activemq/transport/frametranslator/
文件夹中使用的值命名的文件来将其与相应的头值相关联,该文件将包含值class=_你的转换器的完全限定类名_
例如,内置转换器包含以下值
class=org.apache.activemq.transport.stomp.XStreamFrameTranslator
在 META-INF/services/org/apache/activemq/transport/frametranslator/jms-xml
文件中。
调试
如果你想调试代理和客户端之间的 Stomp 通信,你应该使用 trace
参数配置 Stomp 连接器,如下所示
<transportConnectors>
<transportConnector name="stomp" uri="stomp://127.0.0.1:61613?trace=true"/>
</transportConnectors>
这将指示代理跟踪它发送和接收的所有数据包。
此外,你必须为相应的日志启用跟踪。你可以通过将以下内容添加到你的 conf/log4j.properties
来实现这一点
log4j.logger.org.apache.activemq.transport.stomp=TRACE
最后,你可能希望将这些消息保存在单独的文件中,而不是污染标准代理日志。你可以使用以下 log4j 配置来实现这一点
log4j.appender.stomp=org.apache.log4j.RollingFileAppender
log4j.appender.stomp.file=${activemq.base}/data/stomp.log
log4j.appender.stomp.maxFileSize=1024KB
log4j.appender.stomp.maxBackupIndex=5
log4j.appender.stomp.append=true
log4j.appender.stomp.layout=org.apache.log4j.PatternLayout
log4j.appender.stomp.layout.ConversionPattern=%d \[%-15.15t\] %-5p %-30.30c{1} - %m%n
log4j.logger.org.apache.activemq.transport.stomp=TRACE, stomp
log4j.additivity.org.apache.activemq.transport.stomp=false
# Enable these two lines and disable the above two if you want the frame IO ONLY (e.g., no heart beat messages, inactivity monitor etc).
#log4j.logger.org.apache.activemq.transport.stomp.StompIO=TRACE, stomp
#log4j.additivity.org.apache.activemq.transport.stomp.StompIO=false
此后,你所有的 Stomp 数据包都将被记录到 data/stomp.log
中
Java API
从 ActiveMQ Classic 5.2 开始:有一个简单的 Java Stomp API 与 ActiveMQ Classic 一起分发。请注意,此 API 仅用于测试目的,你应该始终考虑从 Java 中使用标准 JMS API,而不是这个 API。以下代码片段提供了一个使用此 API 的简单示例
StompConnection connection = new StompConnection();
connection.open("localhost", 61613);
connection.connect("system", "manager");
StompFrame connect = connection.receive();
if(!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
throw new Exception ("Not connected");
}
connection.begin("tx1");
connection.send("/queue/test", "message1", "tx1", null);
connection.send("/queue/test", "message2", "tx1", null);
connection.commit("tx1");
connection.subscribe("/queue/test", Subscribe.AckModeValues.CLIENT);
connection.begin("tx2");
StompFrame message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
message = connection.receive();
System.out.println(message.getBody());
connection.ack(message, "tx2");
connection.commit("tx2");
connection.disconnect();
此示例是标准 ActiveMQ Classic 发行版的一部分。您可以从 ./example
文件夹中运行它,方法是:
ant stomp
STOMP 扩展用于 JMS 消息语义
请注意,STOMP 的设计尽可能简单 - 因此任何脚本语言/平台都可以用最小的努力向任何其他平台发送消息。STOMP 允许在每个请求上插入可插拔的标头,例如发送和接收消息。ActiveMQ Classic 对 Stomp 协议有一些扩展,因此 Stomp 客户端可以支持 JMS 语义。OpenWire JMS 生产者可以向 Stomp 消费者发送消息,而 Stomp 生产者可以向 OpenWire JMS 消费者发送消息。Stomp 到 Stomp 配置可以使用更丰富的 JMS 消息控制。
STOMP 在 SENT
消息上支持以下标准 JMS 属性
STOMP 标头 | JMS 标头 | 描述 |
---|---|---|
correlation-id |
JMSCorrelationID |
良好的消费者会将此标头添加到他们发送的任何回复中。 |
expires |
JMSExpiration |
消息的过期时间。 |
JMSXGroupID |
JMSXGroupID |
指定消息组。 |
JMSXGroupSeq |
JMSXGroupSeq |
可选标头,指定消息组中的序列号。 |
persistent |
JMSDeliveryMode |
消息是否持久。 |
priority |
JMSPriority |
消息的优先级。 |
reply-to |
JMSReplyTo |
您应该将回复发送到的目标。 |
type |
JMSType |
消息的类型。 |
ActiveMQ Classic 对 STOMP 的扩展
您可以向 STOMP 命令添加自定义标头以配置 ActiveMQ Classic 协议。以下是一些示例
动词 | 标头 | 类型 | 描述 |
---|---|---|---|
CONNECT |
client-id |
字符串 |
指定 JMS clientID,该 ID 与 activemq.subcriptionName 组合使用以表示持久订阅者。 |
SUBSCRIBE |
activemq.dispatchAsync |
布尔值 |
消息应该从生产者线程同步或异步地分派到代理中的非持久主题吗?对于快速消费者,将此设置为 false 。对于慢速消费者,将其设置为 true ,以便分派不会阻塞快速消费者。 |
SUBSCRIBE |
activemq.exclusive |
布尔值 |
我想成为队列上的独占消费者。 |
SUBSCRIBE |
activemq.maximumPendingMessageLimit |
整数 |
对于非持久主题上的慢速消费者处理,通过丢弃旧消息 - 我们可以设置最大挂起限制,这样一旦慢速消费者备份到这个高水位标记,我们就开始丢弃旧消息。 |
SUBSCRIBE |
activemq.noLocal |
布尔值 |
指定是否应忽略本地发送的消息以进行订阅。设置为 true 以过滤掉本地发送的消息。 |
SUBSCRIBE |
activemq.prefetchSize |
整数 |
指定将分派到客户端的最大挂起消息数。一旦达到此最大值,就不会再分派任何消息,直到客户端确认一条消息为止。对于在处理消息可能很慢时跨消费者公平分配消息,将此设置为大于 1 的低值。注意:如果您的 STOMP 客户端是使用 Ruby 等动态脚本语言实现的,那么此参数必须设置为 1 ,因为没有客户端消息大小的概念。STOMP 不支持 0 的值。 |
SUBSCRIBE |
activemq.priority |
字节 |
设置消费者的优先级,以便可以按优先级顺序加权分派。 |
SUBSCRIBE |
activemq.retroactive |
布尔值 |
对于非持久主题,使此订阅具有追溯性。 |
SUBSCRIBE |
activemq.subscriptionName |
字符串 |
对于持久主题订阅,您必须在连接上指定相同的 activemq.client-id 并在 v5.7.0 之前在订阅上指定 activemq.subcriptionName 。注意:拼写 subcriptionName 而不是 subscriptionName 。这不是直观的,但它是在 ActiveMQ Classic 4.x 中实现的方式。对于 ActiveMQ Classic 的 5.0 版本,将支持 subcriptionName 和 subscriptionName (从 v5.6.0 开始删除了 subcriptionName )。 |
SUBSCRIBE |
selector |
字符串 |
使用 JMS 1.1 规范中指定的 SQL 92 语法指定 JMS 选择器。这允许在订阅时对每条消息应用过滤器。 |