咨询消息

功能 > 消息功能 > 咨询消息

ActiveMQ Classic 支持咨询消息,允许您使用常规 JMS 消息监视系统。目前,我们有支持以下内容的咨询消息:

  • 消费者、生产者和连接启动和停止
  • 创建和销毁临时目标
  • 主题和队列上的消息过期
  • 代理向没有消费者的目标发送消息。
  • 连接启动和停止

咨询消息可以被认为是一种管理通道,您可以在其中接收有关 JMS 提供程序上发生的事情以及生产者、消费者和目标上发生的事情的信息。 当您通过 JMX 查看代理时,您将看到以 ActiveMQ.Advisory. 为前缀的咨询主题。

每个咨询都具有消息 类型 Advisory 和一些预定义的消息属性

属性 类型 描述 版本
originBrokerId StringProperty 咨询消息起源的代理 ID。 5.x
originBrokerName StringProperty 咨询消息起源的代理名称。 5.x
originBrokerURL StringProperty 咨询消息起源的代理的第一个 URL。 5.2

此外,一些消息会携带一个 Command 对象,该对象包含有关咨询性质的额外信息,例如,对每个目标的订阅会返回一个 ActiveMQMessage。 可以使用 ActiveMQMessage.getDataStructure() 检索特定 DataStructure 对象,例如 ConsumerInfoProducerInfoConnectionInfo

例如

Destination advisoryDestination = AdvisorySupport.getProducerAdvisoryTopic(destination)
MessageConsumer consumer = session.createConsumer(advisoryDestination);
consumer.setMessageListener(this);

public void onMessage(Message msg){
    if(msg instanceof ActiveMQMessage) {
        try {
             ActiveMQMessage aMsg = (ActiveMQMessage)msg;
             ProducerInfo prod = (ProducerInfo) aMsg.getDataStructure();
        } 
        catch(JMSException e) {
            log.error("Failed to process message: " + msg);
        }
    }
}

支持以下咨询主题

基于客户端的咨询

提示

这些总是生成的。

咨询主题 描述 属性 数据结构
ActiveMQ.Advisory.Connection 连接启动和停止消息。   ConnectionInfoRemoveInfo
ActiveMQ.Advisory.Producer.Queue 队列上的生产者启动和停止消息。 String='producerCount' - 生产者的数量 ProducerInfo
ActiveMQ.Advisory.Producer.Topic 主题上的生产者启动和停止消息。 String='producerCount' - 生产者的数量 ProducerInfo
ActiveMQ.Advisory.Consumer.Queue 队列上的消费者启动和停止消息。 String='consumerCount' - 消费者的数量 ConnectionInfoRemoveInfo
ActiveMQ.Advisory.Consumer.Topic 主题上的消费者启动和停止消息。 String='consumerCount' - 消费者的数量 ConnectionInfoRemoveInfo

请注意,消费者启动/停止咨询消息还具有一个 consumerCount 标头,以指示发送咨询消息时目标上活动消费者的数量。 这意味着您可以使用以下选择器在给定目标上没有活动消费者时收到通知

consumerCount = 0

基于目标和消息的咨询

咨询主题 描述 属性 数据结构 默认 PolicyEntry 属性
ActiveMQ.Advisory.Queue 队列创建和销毁。 null DestinationInfo true none
ActiveMQ.Advisory.Topic 主题创建和销毁。 null DestinationInfo true none
ActiveMQ.Advisory.TempQueue 临时队列创建和销毁。 null DestinationInfo true none
ActiveMQ.Advisory.TempTopic 临时主题创建和销毁。 null DestinationInfo true none
ActiveMQ.Advisory.Expired.Queue 队列上的过期消息。 String='orignalMessageId' - 过期的 ID Message true none
ActiveMQ.Advisory.Expired.Topic 主题上的过期消息。 String='orignalMessageId' - 过期的 ID Message true none
ActiveMQ.Advisory.NoConsumer.Queue 没有可用的消费者来处理发送到队列的消息。 null Message false sendAdvisoryIfNoConsumers
ActiveMQ.Advisory.NoConsumer.Topic 没有可用的消费者来处理发送到主题的消息。 null Message false sendAdvisoryIfNoConsumers

提示

NoConsumer 主题咨询仅针对非持久性消息发送。

5.2 版本中的新咨询

咨询主题 描述 属性 数据结构 默认 PolicyEntry 属性
ActiveMQ.Advisory.SlowConsumer.Queue 队列消费者速度慢。 String='consumerId' - 消费者 ID ConsumerInfo false advisoryForSlowConsumers
ActiveMQ.Advisory.SlowConsumer.Topic 主题消费者速度慢。 String='consumerId' - 消费者 ID ConsumerInfo false advisoryForSlowConsumers
ActiveMQ.Advisory.FastProducer.Queue 队列生产者速度快。 String='producerId' - 生产者 ID ProducerInfo false advisoryForFastProducers
ActiveMQ.Advisory.FastProducer.Topic 主题生产者速度快。 String='consumerId'’ - 生产者 ID ProducerInfo false advisoryForFastProducers
ActiveMQ.Advisory.MessageDiscarded.Queue 消息被丢弃。 String='orignalMessageId' - 被丢弃的 ID Message false advisoryForDiscardingMessages
ActiveMQ.Advisory.MessageDiscarded.Topic 消息被丢弃。 String='orignalMessageId' - 被丢弃的 ID Message false advisoryForDiscardingMessages
ActiveMQ.Advisory.MessageDelivered.Queue 消息已传递到代理。 String='orignalMessageId' - 已传递的 ID Message false advisoryForDelivery
ActiveMQ.Advisory.MessageDelivered.Topic 消息已传递到代理。 String='orignalMessageId' - 已传递的 ID Message false advisoryForDelivery
ActiveMQ.Advisory.MessageConsumed.Queue 消息被客户端消费。 String='orignalMessageId' - 已传递的 ID Message false advisoryForConsumed
ActiveMQ.Advisory.MessageConsumed.Topic 消息被客户端消费。 String='orignalMessageId' - 已传递的 ID Message false advisoryForConsumed
ActiveMQ.Advisory.FULL 使用资源已达到其限制。 String='usageName' - 使用资源的名称 null false advisoryWhenFull
ActiveMQ.Advisory.MasterBroker 代理现在是主/从配置中的主代理。 null null true none

5.4 版本中的新咨询

咨询主题 描述 属性 数据结构 默认 PolicyEntry 属性
ActiveMQ.Advisory.MessageDLQd.Queue 消息发送到 DLQ。 String='orignalMessageId' - 已传递的 ID Message 始终开启 advisoryForConsumed
ActiveMQ.Advisory.MessageDLQd.Topic 消息发送到 DLQ。 String='orignalMessageId' - 已传递的 ID Message 始终开启 advisoryForConsumed

网络桥接咨询

从 ActiveMQ Classic 5.5 版本开始,您可以监视与网络桥接状态相关的事件的咨询主题。 网络桥接启动或停止时,您可以获得咨询消息。

咨询主题 描述 属性 数据结构 默认
ActiveMQ.Advisory.NetworkBridge 网络桥接正在停止或启动。 Boolean="started" - true 如果桥接已启动,false 如果它已停止。 Boolean="createdByDuplex" - true 如果桥接由远程网络连接器创建。 BrokerInfo - 提供远程代理的数据 始终开启

启用默认情况下禁用的咨询

默认情况下未打开的咨询(请参阅最后一列)可以在 ActiveMQ Classic 代理配置中的 PolicyEntry 上启用,例如,要启用消息消费咨询,您可以配置以下内容

<destinationPolicy>
    <policyMap>
       <policyEntries> 
           <policyEntry topic=">" advisoryForConsumed="true"/>
       </policyEntries>
    </policyMap>
</destinationPolicy>

提示

> 字符匹配所有主题 - 您可以使用通配符匹配来设置目标策略 - 请参阅 通配符

禁用咨询消息

咨询消息的使用会导致内存和连接资源方面的一点开销,这与系统中的目标数量有关。 在某些情况下,禁用所有咨询可能很有意义。

咨询需要在代理上禁用,通过 XML 配置

<broker advisorySupport="false">

或从 Java 代码

BrokerService broker = new BrokerService();
broker.setAdvisorySupport(false);
// ...
broker.start();

以及在您的 ActiveMQConnectionFactory 上(因为对咨询主题的订阅会自动创建它)通过 brokerUrl

tcp://127.0.0.1:61616?jms.watchTopicAdvisories=false

或通过使用 ActiveMQConnectionFactory 上的 watchTopicAdvisories 属性的 Java 代码。

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setWatchTopicAdvisories(false);

警告

咨询消息是 动态网络代理拓扑 所必需的,因为 NetworkConnectors 会订阅咨询消息。 如果没有咨询消息,则网络必须进行静态配置。

使用目标

以上所有目标实际上都是前缀,这些前缀会附加重要的信息(如实际的主题或队列、客户端 ID、生产者 ID、消费者 ID 等)。 这使您可以重复使用发布/订阅、通配符选择器 的功能来按需筛选咨询消息。

例如,如果您想订阅主题 FOO.BAR 上的过期消息,您可以订阅 ActiveMQ.Advisory.Expired.Topic.FOO.BAR。 要订阅特定类型咨询的所有消息,只需在主题后附加 .>,例如,要订阅所有对主题和队列启动和停止的消费者,请订阅 ActiveMQ.Advisory.Consumer..>

帮助方法

用于获取咨询目标对象的方法在 AdvisorySupport 中通过以下方法提供。

  • AdvisorySupport.getConsumerAdvisoryTopic()
  • AdvisorySupport.getProducerAdvisoryTopic()
  • AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
  • AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
  • AdvisorySupport.getNoTopicConsumersAdvisoryTopic()
  • AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
  • AdvisorySupport.getDestinationAdvisoryTopic()
  • AdvisorySupport.getExpiredQueueMessageAdvisoryTopic()
  • AdvisorySupport.getExpiredTopicMessageAdvisoryTopic()
  • AdvisorySupport.getNoQueueConsumersAdvisoryTopic()
  • AdvisorySupport.getNoTopicConsumersAdvisoryTopic()

// 5.2 版本及以上

  • AdvisorySupport.getSlowConsumerAdvisoryTopic()
  • AdvisorySupport.getFastProducerAdvisoryTopic()
  • AdvisorySupport.getMessageDiscardedAdvisoryTopic()
  • AdvisorySupport.getMessageDeliveredAdvisoryTopic()
  • AdvisorySupport.getMessageConsumedAdvisoryTopic()
  • AdvisorySupport.getMasterBrokerAdvisoryTopic()
  • AdvisorySupport.getFullAdvisoryTopic()

一些用于处理咨询消息的帮助类在 advisories 包中提供。

Apache, ActiveMQ, Apache ActiveMQ、Apache 羽毛徽标和 Apache ActiveMQ 项目徽标是 Apache 软件基金会的商标。 版权所有 © 2024,Apache 软件基金会。 根据 Apache 许可证 2.0 获得许可。