MQTT 是一种轻量级、客户端到服务器的发布/订阅消息协议。MQTT 的设计目的是为了减少传输开销(从而减少网络流量)和客户端设备上的代码占用空间。出于这个原因,MQTT 非常适合受限设备,例如传感器和执行器,并且正在迅速成为物联网的事实标准通信协议。

Apache ActiveMQ Artemis 支持以下 MQTT 版本(包含指向各自规范的链接)

默认情况下,已配置 acceptor 元素以在端口 616161883 上接受 MQTT 连接。

有关为 MQTT 配置 acceptor 的详细信息,请参见常规 协议和互操作性 章节。

有关此功能的一些实际操作示例,请参阅 MQTT 示例

1. MQTT 服务质量

MQTT 提供 3 种服务质量级别。

每条消息(或主题订阅)都可以定义与其关联的服务质量,该服务质量与之关联。主题上定义的服务质量级别是客户端愿意接受的最高级别。消息上的服务质量级别是此消息的所需服务质量级别。代理将尝试根据消息和主题订阅上定义的内容,以最高的服务质量级别将消息传递给订阅者。

每个服务质量级别都提供一定级别的保证,用于发送或接收消息

  • QoS 0:最多一次

    保证订阅者最多只接收一次特定消息。这意味着消息可能永远不会到达。发送方和接收方将尝试传递消息,但如果出现故障并且消息没有到达目的地(例如由于网络连接),则消息可能会丢失。此 QoS 的网络流量开销最小,对客户端和代理的负担最小,通常用于一些数据丢失并不重要的遥测数据。

  • QoS 1:至少一次

    保证消息将到达其预期接收者一次或多次。发送方将继续发送消息,直到从接收方收到确认,确认已收到消息。此 QoS 的结果是,接收方可能会多次收到消息,并且还会增加比 QoS 0 更高的网络开销(由于确认)。此外,发送方会承受更大的负担,因为它需要存储消息并在合理时间内未能收到确认的情况下重试。

  • QoS 2:仅一次

    这是 QoS 中开销最大的一种(就网络流量和对发送方和接收方的负担而言),此 QoS 将确保接收方恰好只接收一次消息。这确保了接收方永远不会收到消息的任何重复副本,并且最终会收到消息,但需要在发送方和接收方上增加网络开销和复杂性。

2. MQTT 保留消息

MQTT 具有一个有趣的特性,即消息可以针对特定地址“保留”。这意味着,一旦将保留消息发送到地址,任何新订阅该地址的订阅者都将在任何其他消息之前收到上次发送的保留消息。即使保留消息是在客户端连接或订阅之前发送的,也会发生这种情况。此功能可能在物联网等环境中很有用,在这种环境中,设备需要在加入系统时快速获取系统的当前状态。

保留消息存储在名为特殊前缀的队列中,该前缀根据它们最初发送到的主题的名称而定。例如,发送到主题 /abc/123 的保留消息将存储在名为 $sys.mqtt.retain.abc.123 的多播队列中,该队列具有相同的名称地址。MQTT 规范没有定义保留消息应存储多长时间,因此代理将保留这些数据,直到客户端显式删除保留消息或它可能过期。但是,即使在那个时候,保留消息的队列和地址仍然会保留。可以使用以下 address-setting 自动删除这些资源

<address-setting match="$sys.mqtt.retain.#">
   <auto-delete-queues>true</auto-delete-queues>
   <auto-delete-addresses>true</auto-delete-addresses>
</address-setting>

请记住,还可以自动将 expiry-delay 应用于保留消息。

3. 遗嘱消息

当客户端最初连接到代理时,可以发送遗嘱消息。客户端能够在连接数据包中设置“遗嘱消息”。如果客户端异常断开连接,例如由于设备或网络故障,代理将继续将遗嘱消息发布到指定地址(如连接数据包中定义的那样)。遗嘱主题的其他订阅者将收到遗嘱消息,并且可以相应地做出反应。此功能在物联网风格的场景中可能很有用,用于检测可能在设备大规模部署中的错误。

4. 调试日志

通过为 org.apache.activemq.artemis.core.protocol.mqtt 启用 TRACE 日志记录,可以激活详细的协议日志记录(例如,进出数据包)。请按照 这些步骤 适当地配置日志记录。

MQTT 规范没有规定客户端发布的有效负载格式。就代理而言,有效负载只是一个字节数组。但是,为了便于日志记录,代理会将有效负载编码为 UTF-8 字符串并打印它们,最多 256 个字符。有效负载日志记录仅限于避免用可能数百兆字节的无用信息填充日志。

5. 持久订阅

MQTT 会话的订阅信息存储在名为 $sys.mqtt.sessions 的内部队列中,并持久化到存储(假设已启用持久性)。信息是持久的,以便 MQTT 订阅者可以在代理重新启动、故障等之后重新连接并无缝地恢复其订阅。当代理配置为高可用性时,此信息将在备份上可用,因此即使在代理故障转移的情况下,订阅者也能够恢复其订阅。

尽管持久订阅很方便,但它们会造成性能损失,因为数据必须写入存储。如果您不需要这种便利(例如,您始终使用干净会话)并且您不想承受性能损失,那么您可以在 broker.xml 中为 $sys.mqtt.sessions 队列禁用持久性,从而禁用它,例如:

<addresses>
   ...
   <address name="$sys.mqtt.sessions">
      <anycast>
         <queue name="$sys.mqtt.sessions">
            <durable>false</durable>
         </queue>
      </anycast>
   </address>
   ...
</addresses>

设置 mqtt-session-state-persistence-timeout 控制代理在抛出错误之前等待数据写入存储的时间。以毫秒为单位。默认值为 5000

6. 自定义客户端 ID 处理

MQTT 应用程序使用的客户端 ID 非常重要,因为它唯一地标识应用程序。在某些情况下,代理管理员可能希望执行额外的验证,甚至修改传入的客户端 ID 以支持特定用例。这可以通过实现自定义安全管理器来实现,如 security-manager 示例 中所示。

最简单的实现就像 security-manager 示例使用的“包装器”一样。在 authenticate 方法中,您可以使用传递的 org.apache.activemq.artemis.spi.core.protocol.RemotingConnection 上的 setClientId() 修改客户端 ID。如果您对客户端 ID 执行了一些自定义验证,则可以通过抛出 org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException 来拒绝客户端 ID。

7. 通配符订阅

MQTT 为主题过滤器定义了特殊的通配符语法。此定义位于 3.1.15 规范的第 4.7.1 节中。MQTT 主题类似于文件系统,是分层的,并且它们使用特殊字符(默认情况下为 /)来分隔分层级别。订阅者可以订阅特定主题或订阅整个层次结构的分支。

要订阅地址层次结构的分支,订阅者可以使用通配符。MQTT 中有两种类型的通配符

  • 多级 (#)

    将此通配符添加到地址将匹配指定节点下地址层次结构的所有分支。例如:/uk/ 将匹配 /uk/cities/uk/cities/newcastle 以及 /uk/rivers/tyne。订阅地址 将导致订阅代理中的所有主题。这很有用,但应谨慎操作,因为它会对性能产生重大影响。

  • 单级 (+)

    匹配地址层次结构中的单个级别。例如 /uk/+/stores 将匹配 /uk/newcastle/stores,但不匹配 /uk/cities/newcastle/stores

这非常接近默认的 通配符语法,但并不完全相同。因此,需要进行一些转换。这种转换不是免费的,因此如果您想要最佳的 MQTT 性能,请使用 broker.xml 将通配符语法配置为与 MQTT 的语法匹配,例如:

<core>
   ...
   <wildcard-addresses>
      <delimiter>/</delimiter>
      <any-words>#</any-words>
      <single-word>+</single-word>
   </wildcard-addresses>
   ...
</core>

当然,更改默认语法也意味着其他协议上的其他客户端也需要遵循相同的语法,以及 address-setting 配置元素的 match 值。

8. Web 套接字

Apache ActiveMQ Artemis 还支持通过 Web 套接字 使用 MQTT。

通过正常的 MQTT 接受器支持通过 Web 套接字使用 MQTT

<acceptor name="mqtt-ws-acceptor">tcp://host:1883?protocols=MQTT</acceptor>

使用此配置,Apache ActiveMQ Artemis 将在端口 1883 上通过 Web 套接字接受 MQTT 连接。然后,Web 浏览器可以使用 Web 套接字连接到 ws://<server>:1883 以发送和接收 MQTT 消息。

SSL/TLS 也可用,例如:

<acceptor name="mqtt-wss-acceptor">tcp://host:8883?protocols=MQTT;sslEnabled=true;keyStorePath=/path/to/keystore;keyStorePassword=myPass</acceptor>

然后,Web 浏览器可以使用 Web 套接字连接到 wss://<server>:8883 以发送和接收 MQTT 消息。

MQTT 规范定义了一种通常称为“链接窃取”的行为。这意味着,每当新的客户端使用与其他现有客户端相同的客户端 ID 连接时,现有客户端的会话将被关闭,其网络连接将被终止。

在某些用例中,不希望出现这种行为,因此它是可配置的。可以在 MQTT acceptor 上配置 URL 参数 allowLinkStealing 以修改此行为。默认情况下,allowLinkStealingtrue。如果将其设置为 false,那么每当新的客户端使用与其他现有客户端相同的客户端 ID 连接时,新的 客户端的会话将被关闭,其网络连接将被终止。对于 MQTT 5 客户端,它们将收到 0x80(即“未指定错误”) 的断开连接原因代码。

10. 自动订阅清理

有时使用CleanSession=false的MQTT 3.x客户端无法正确取消订阅。可以在MQTT acceptor上配置URL参数defaultMqttSessionExpiryInterval,以便在过期时间间隔过后自动清理放弃的会话和订阅队列。

MQTT 5具有相同的基本语义,但配置略有不同。CleanSession标志被替换为CleanStart,以及一个会话过期时间间隔属性。代理将使用客户端的会话过期时间间隔(如果设置)。如果没有设置,代理将应用defaultMqttSessionExpiryInterval

默认的defaultMqttSessionExpiryInterval-1,这意味着对于不传递其自身会话过期时间间隔的MQTT 3.x客户端或MQTT 5客户端,将不会进行任何清理。否则,它表示客户端断开连接后必须经过的秒数,然后代理才会删除会话状态和订阅队列。

默认情况下,MQTT 会话状态每 5000 毫秒扫描一次。可以使用broker.xmlcore部分中设置的mqtt-session-scan-interval元素更改此值。

11. 流量控制

MQTT 5 引入了简单的流量控制形式。简而言之,代理可以告诉客户端在收到确认之前可以接收多少个 QoS 1 和 2 消息,反之亦然。

这通过在broker.xml的 MQTT acceptor 上设置 receiveMaximum URL 参数来控制。

默认值为65535(MQTT 使用的 2 字节整数的最大值)。

MQTT 5 规范禁止使用0值。

-1的值将阻止代理通知客户端任何接收最大值,这意味着从客户端到代理的流量控制将被禁用。这实际上与将值设置为65535相同,但会减少CONNACK数据包的大小几个字节。

12. 主题别名最大值

MQTT 5 引入了主题别名。这是一种针对PUBLISH控制数据包大小的优化,因为现在可以使用一个 2 字节整数来代替主题名称(可能非常长)。

客户端和代理都可以相互通知他们支持的最大别名值(即他们支持多少个不同的别名)。这通过在 MQTT 客户端使用的acceptor上使用topicAliasMaximum URL 参数来控制。

默认值为65535(MQTT 使用的 2 字节整数的最大值)。

0的值将禁用从客户端到代理的主题别名。

-1的值将阻止代理通知客户端任何主题别名最大值,这意味着从客户端到代理的别名将被禁用。这实际上与将值设置为0相同,但会减少CONNACK数据包的大小几个字节。

13. 最大数据包大小

MQTT 5 引入了最大数据包大小。这是服务器或客户端愿意接受的最大数据包大小。

这通过在broker.xml的 MQTT acceptor 上设置 maximumPacketSize URL 参数来控制。

默认值为268435455(即 256MB - MQTT 使用的可变字节整数的最大值)。

MQTT 5 规范禁止使用0值。

-1的值将阻止代理通知客户端任何最大数据包大小,这意味着不会对传入数据包的大小进行限制。这也减少了CONNACK数据包的大小几个字节。

14. 服务器保持活动

所有 MQTT 版本都支持由客户端定义的连接保持活动值。MQTT 5 引入了服务器保持活动值,以便代理可以定义客户端应使用的值。服务器保持活动的主要用途是让服务器通知客户端,它将比客户端指定的保持活动值更早地由于不活动而断开客户端的连接。

这通过在broker.xml的 MQTT acceptor 上设置 serverKeepAlive URL 参数来控制。

默认值为60,以为单位。

0的值将完全禁用保持活动,无论客户端的保持活动值如何。不建议这样做,因为禁用保持活动通常被认为很危险,因为它可能导致资源耗尽。

-1的值意味着代理将始终接受客户端的保持活动值(即使该值为0)。

任何其他值都意味着如果serverKeepAlive小于客户端的保持活动值,则将应用serverKeepAlive除非客户端的保持活动值为0,在这种情况下将应用serverKeepAlive。这是因为0的值将禁用保持活动,而禁用保持活动通常被认为很危险,因为它可能导致资源耗尽。

15. 增强身份验证

MQTT 5 引入了增强身份验证,它扩展了现有的名称和密码身份验证,包括质询/响应式身份验证。

但是,目前还没有实现任何质询/响应机制,因此如果客户端在CONNECT数据包中传递“身份验证方法”属性,它将收到一个带有0x8C原因代码的CONNACK(即身份验证方法错误),并且网络连接将被关闭。

16. 发布授权失败

MQTT 3.1.1 规范在代理由于缺乏授权而导致PUBLISH数据包失败时的行为方面含糊不清。在第 3.3.5 节中,它指出

如果服务器实现不允许客户端执行 PUBLISH;它无法通知该客户端。它必须根据正常的 QoS 规则进行肯定确认,或者关闭网络连接

默认情况下,代理将关闭网络连接。但是,如果您希望代理进行肯定确认,请在broker.xml中将 URL 参数closeMqttConnectionOnPublishAuthorizationFailure设置为false,并将该参数设置在相关的 MQTT acceptor上,例如

<acceptor name="mqtt">tcp://0.0.0:1883?protocols=MQTT;closeMqttConnectionOnPublishAuthorizationFailure=false</acceptor>