Apache ActiveMQ ™ -- 处理咨询消息

ActiveMQ 支持咨询消息,允许您使用常规 CMS 消息来监控系统。目前,您可以使用咨询消息执行以下几项操作:

  • 查看消费者、生产者和连接的启动和停止
  • 查看临时目标的创建和销毁
  • 在主题和队列上收到有关消息过期的通知
  • 观察代理将消息发送到没有消费者的目标。
  • 查看连接的启动和停止

可以将咨询消息视为一种管理通道,您将通过它接收有关 JMS 提供商上发生的事情以及生产者、消费者和目标的发生情况的信息。要详细了解代理的咨询消息支持,请参阅此文章

开始之前

本教程假设读者熟悉 CMS API,并了解如何使用 ActiveMQ-CPP 库构建基本应用程序。如果您不确定如何使用 CMS API,则应首先阅读CMS API 概述。本教程是针对 ActiveMQ-CPP 3.0 版 API 编写的,尽管可以使用旧的 ActiveMQ-CPP 2.x 客户端处理咨询消息,但本文未显示一些代码差异。

订阅咨询主题

要接收咨询消息,您首先需要订阅提供所需咨询消息的主题。您可以像订阅任何其他目标一样订阅这些主题,诀窍是使用要使用的主题的正确名称。让我们先看看一些可供使用的类型(这不是完整列表)。

咨询主题 描述 属性 数据结构
ActiveMQ.Advisory.Connection 连接启动和停止消息    
ActiveMQ.Advisory.Producer.Queue 在队列上启动和停止生产者消息 String=’producerCount’ - 生产者数量 ProducerInfo
ActiveMQ.Advisory.Producer.Topic 在主题上启动和停止生产者消息 String=’producerCount’ - 生产者数量 ProducerInfo
ActiveMQ.Advisory.Consumer.Queue 在队列上启动和停止消费者消息 String=’consumerCount’ - 消费者数量 ConsumerInfo
ActiveMQ.Advisory.Consumer.Topic 在主题上启动和停止消费者消息 String=’consumerCount’ - 消费者数量 ConsumerInfo

现在,查看上面的列表,让我们选择其中一个主题,并弄清楚如何创建 CMS 主题以订阅,以便接收咨询消息。使用 Java 客户端,我们可以使用实用程序类 AdvisorySupport 创建目标,但目前 ActiveMQ-CPP 未提供此类实用程序类,希望我们会在将来的版本中看到它(当然始终欢迎贡献!)。

如果我们有一个名为 TOPIC.FOO 的主题,并且想知道何时生产者订阅该主题,我们需要创建一个名称为 ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO 的主题对象,以便接收我们感兴趣的咨询消息。我们之所以知道这一点,是因为我们可以查看上面的表格,并看到 ActiveMQ.Advisory.Producer.Topic 在生产者开始或停止发布消息到主题时会收到通知,我们还知道我们的主题名为 TOPIC.FOO,因此将它们加在一起就可以得到咨询主题的名称,我们之所以知道这一点,是因为我们查看了 AdvisorySupport.java 类,并且,不,这不是作弊。以下代码片段显示了使用 CMS 会话创建主题。

std::auto_ptr<cms::Topic> advisories( session->createTopic(
    "ActiveMQ.Advisory.Producer.Topic.TOPIC.FOO" ) );

创建完要监听的咨询消息的主题后,我们只需创建一个消费者来监听它们即可,以下代码片段演示了这一点。

std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );

如您所见,咨询主题订阅与 CMS 中任何其他目标的订阅之间没有区别。在上面的示例中,我们注册为异步侦听器,您也可以使用正常的阻塞 receive 方法,但我们更喜欢这种方法。

处理传入的咨询消息

每个咨询都具有消息类型“Advisory”以及一些预定义的消息属性,要检查 CMS 消息的这种类型,您需要在 Message 对象上调用 getCMSType 方法。在某些情况下,您会知道要接收的唯一消息是咨询消息,因为您的客户端只订阅咨询主题,而其他情况下,您可能将多个 MessageConsumer 连接到同一个 MessageListener,在这种情况下,您将不得不检查消息类型。一旦您知道自己正在处理咨询消息,就可以开始检查它以确定它是什么类型的消息,并提取对您的应用程序有意义的数据。

许多咨询消息将有意义的数据存储在消息属性中,例如,消费者启动/停止咨询消息包含一个键为 consumerCount 的元素,该元素填充了当前在相关主题或队列上的活动消费者数量。现在,让我们看一个代码片段,该代码片段检查在 onMessage 回调中接收的消息,以查看它是否是咨询消息,如果是,则对其进行操作。

void AdvisoryProducer::onMessage( const cms::Message* message ) {

   if( message->getCMSType() == "Advisory" ) {

       std::cout << "Received an Advisory Message!" << std::endl;

       if( message->propertyExists( "consumerCount" ) ) {

           std::string consumerCount = message->getStringProperty( "consumerCount" );
           std::cout << "Number of Consumers = " << consumerCount << std::endl;

           // Do Something Meaningful here....
       }

   } else {
       std::cout << "Received a Non-Advisory Message!" << std::endl;
   }
}

完整示例:仅在有消费者时才生产的生产者

现在您已经了解了咨询消息处理的基本知识,现在该向您展示一个完整的示例,以演示您可以使用咨询消息完成的操作。以下代码显示了一个类头文件和源文件,它们实现了一个基本的 CMS 生产者,该生产者仅在有活动消费者时才向名为 HEART-BEAT-CHANNEL 的主题发送心跳消息,否则它将处于空闲状态。

#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_

#include <string>
#include <memory>

#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>

#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>

namespace activemqcpp {
namespace examples {
namespace advisories {

    /**
     * A sample Producer that will only send Messages on its Topic when it has
     * received an advisory indicating that there is an active MessageConsumer
     * on the Topic.  Once another message comes in indicating that there is no
     * longer a consumer then this Producer stops producing again.
     *
     * @since 3.0
     */
    class AdvisoryProducer : public decaf::lang::Runnable,
                             public cms::MessageListener {
    private:

        volatile bool consumerOnline;
        volatile bool shutdown;
        decaf::util::concurrent::CountDownLatch shutdownLatch;

        cms::Session* session;
        std::auto_ptr<cms::MessageConsumer> consumer;
        std::auto_ptr<cms::MessageProducer> producer;

    public:

        AdvisoryProducer( cms::Session* session );
        virtual ~AdvisoryProducer();

        /**
         * Shut down the processing that occurs in the Run method.
         */
        void stop();

        /**
         * Run the producer code.
         */
        virtual void run();

        /**
         * Async Message callback.
         */
        virtual void onMessage( const cms::Message* message );

    };

}}}

#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_ADVISORYPRODUCER_H_ */

AdvisoryProducer 源文件

#include "AdvisoryProducer.h"

#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/Integer.h>

using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::AdvisoryProducer( cms::Session* session ) : shutdownLatch(1) {

    if( session == NULL ) {
        throw NullPointerException(
            __FILE__, __LINE__, "Session Object passed was Null." );
    }

    std::auto_ptr<cms::Topic> destination( session->createTopic(
        "HEART-BEAT-CHANNEL" ) );
    std::auto_ptr<cms::Topic> advisories( session->createTopic(
        "ActiveMQ.Advisory.Consumer.Topic.HEART-BEAT-CHANNEL" ) );

    this->shutdown = false;
    this->consumerOnline = false;

    this->session = session;
    this->producer.reset( session->createProducer( destination.get() ) );
    this->consumer.reset( session->createConsumer( advisories.get() ) );
    this->consumer->setMessageListener( this );
}

////////////////////////////////////////////////////////////////////////////////
AdvisoryProducer::~AdvisoryProducer() {
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::stop() {
    this->shutdown = true;
    this->shutdownLatch.await( 3000 );
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::run() {

    while( !this->shutdown ) {

        if( this->consumerOnline ) {

            std::auto_ptr<cms::TextMessage> message(
                this->session->createTextMessage( "Alive" ) );

            this->producer->send( message.get() );

            Thread::sleep( 1000 );
        }
    }

    this->shutdownLatch.countDown();
}

////////////////////////////////////////////////////////////////////////////////
void AdvisoryProducer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        if( message->propertyExists( "consumerCount" ) ) {

            std::string consumerCount = message->getStringProperty( "consumerCount" );
            std::cout << "Number of Consumers = " << consumerCount << std::endl;
            this->consumerOnline = Integer::parseInt( consumerCount ) > 0 ? true : false;
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

高级主题 处理咨询中的命令对象

如果您阅读了本教程开头提到的 ActiveMQ 咨询消息文章,那么您就会知道某些咨询消息可以包含一个嵌入式命令对象。如果您没有阅读该文章,那么本节将非常令人困惑,所以请阅读它。我们可以通过一些工作在 CMS 中访问这些命令对象,这意味着我们可以充分利用咨询消息功能。

所有咨询消息都作为基本 ActiveMQMessage 发送到您的客户端。ActiveMQ-CPP 中的底层类型层次结构与 ActiveMQ 相同,因此您在咨询文章中看到的嵌入式命令对象的名称相同,并且它们包含几乎相同的信息,尽管有时信息会以更适合 C++ 或更不友好的方式进行编码,具体取决于您的观点。

为了演示如何访问命令对象,让我们尝试创建一个客户端应用程序,该应用程序监听代理以获取指示临时目标已创建或销毁的咨询。每当创建或销毁相应的临时目标时,代理都会向“ActiveMQ.Advisory.TempTopic”和“ActiveMQ.Advisory.TempQueue”主题发布咨询消息,并且命令对象将是 DestinationInfo 类型。DestinationInfo 对象包含一个 Destination 对象,该对象描述了相关目标,以及一个 Operation Type 值,指示命令是创建命令还是销毁命令。首先让我们看看如何订阅此咨询主题。

订阅复合咨询主题

std::auto_ptr<cms::Topic> advisories( session->createTopic(
    "ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );

std::auto_ptr<cms::MessageConsumer> consumer;
consumer.reset( session->createConsumer( advisories.get() ) );
consumer->setMessageListener( this );

如上面的代码片段所示,我们只需创建一个新的主题对象,其名称是我们要订阅的两个主题的组合,这将使我们的单个 MessageConsumer 实例能够接收临时主题和临时队列咨询。与之前一样,我们还创建一个 MessageConsumer,并将我们类的实例注册为异步侦听器。现在剩下要做的就是实现 MessageListener 接口的 onMessage 方法,让我们看看这段代码。

处理包含嵌入式命令对象的咨询消息

////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        const ActiveMQMessage* amqMessage =
            dynamic_cast<const ActiveMQMessage*>( message );

        if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
            std::cout << "Advisory Message contains a Command Object!" << std::endl;

            try {

                Pointer<DestinationInfo> info =
                    amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

                unsigned char operationType = info->getOperationType();

                if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Removed."
                              << std::endl;
                } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Added."
                              << std::endl;
                } else {
                    std::cout << "ERROR: I have no Idea what just happened!"
                              << std::endl;
                }

            } catch( ClassCastException& ex ) {
                std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
                          << "it wasn't so PANIC!!"
                          << std::endl;
            }
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

幸运的是,上面的代码看起来比实际复杂,让我们更详细地逐行分析,以便了解发生了什么。

获取 ActiveMQMessage 对象

if( message->getCMSType() == "Advisory" ) {

    std::cout << "Received an Advisory Message!" << std::endl;

    const ActiveMQMessage* amqMessage =
        dynamic_cast<const ActiveMQMessage*>( message );

    ... Other scary code comes next...

else {
    std::cout << "Received a Non-Advisory Message!" << std::endl;
}

我们首先需要做的是检查我们是否接收了咨询消息,ActiveMQ 将消息类型编码为“Advisory”,因此这很容易。我们不需要在这里执行此操作,因为我们的消费者只监听咨询主题,但检查一下也不错。一旦我们知道它是一个咨询消息,我们就知道消息指针应该是 ActiveMQMessage 类型,它隐藏在通用的 cms::Message 伪装之下,因此我们使用 dynamic_cast 将其转换为它。现在我们已经转换为 ActiveMQMessage,接下来要做什么?让我们看看。

检查是否有嵌入式命令对象

if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
    std::cout << "Advisory Message contains a Command Object!" << std::endl;

每个 ActiveMQMessage 派生对象都有一个名为 getDataStructure 的方法,该方法可用于各种有用的用途,这里我们试图查看此消息中是否包含命令对象,并且您猜对了,getDataStructure 方法将告诉我们是否包含命令对象。如果有,我们可以继续检查 DestinationInfo 对象。

获取 DestinationInfo 对象

try {

    Pointer<DestinationInfo> info =
        amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

    unsigned char operationType = info->getOperationType();

    if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
        std::cout << "Temporary Destination {"
                  << info->getDestination()->getPhysicalName()
                  << "} Removed."
                  << std::endl;
    } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
        std::cout << "Temporary Destination {"
                  << info->getDestination()->getPhysicalName()
                  << "} Added."
                  << std::endl;
     } else {
        std::cout << "ERROR: I have no Idea what just happened!"
                  << std::endl;
     }

} catch( ClassCastException& ex ) {
    std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
              << "it wasn't so PANIC!!"
              << std::endl;
}

您可能首先要问这段代码片段的“什么是 Pointer?”,这是一种线程安全的智能指针,ActiveMQ-CPP 在内部使用它来管理构成 cms::Message 对象的所有指针,以及其他事情。我们创建了一个 Pointer 实例类型,它将指向我们的 DestinationInfo 命令(如果 dynamicCast 方法能够进行该转换),否则将抛出 ClassCastException。一旦我们拥有了 DestinationInfo 指针,我们就可以检索命令的操作类型,然后将其与 ActiveMQConstants 中的常量进行比较,以查看对目标进行了什么操作。只有两种操作类型:添加和删除,但由于 DestinationInfo 对象将操作类型值编码为无符号字符,因此我们提供了一个回退情况来提醒我们发生此错误。我们快完成了,剩下要做的就是输出发生了什么,以及让用户知道目标的名称,Destination 类中的 **getPhysicalName** 方法告诉我们这一点。您还可以使用 Destination 对象找出目标是主题还是队列,我们将把这留给读者作为练习。

完整示例:监听临时目标创建和销毁的消费者

我们客户端应用程序的完整代码如下所示,您也可以在源代码分发包的示例文件夹中找到此代码以及一个创建临时主题和临时队列的简单客户端。

TempDestinationAdvisoryConsumer 头文件

#ifndef _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_
#define _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_

#include <string>
#include <memory>

#include <cms/Session.h>
#include <cms/MessageProducer.h>
#include <cms/MessageConsumer.h>
#include <cms/MessageListener.h>

#include <decaf/lang/Runnable.h>

namespace activemqcpp {
namespace examples {
namespace advisories {

    /**
     * Monitors a Broker for Temporary Topic creation and destruction.
     *
     * @since 3.0
     */
    class TempDestinationAdvisoryConsumer : public cms::MessageListener {
    private:

        cms::Session* session;
        std::auto_ptr<cms::MessageConsumer> consumer;

    public:

        TempDestinationAdvisoryConsumer( cms::Session* session );
        virtual ~TempDestinationAdvisoryConsumer();

        /**
         * Async Message callback.
         */
        virtual void onMessage( const cms::Message* message );

    };

}}}

#endif /* _ACTIVEMQCPP_EXAMPLES_ADVISORIES_TEMPDESTINATIONADVISORYCONSUMER_H_ */

TempDestinationAdvisoryConsumer 源文件

#include "TempDestinationAdvisoryConsumer.h"

#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/DestinationInfo.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/lang/exceptions/ClassCastException.h>
#include <decaf/lang/Integer.h>

using namespace std;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace activemq;
using namespace activemq::commands;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;

////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::TempDestinationAdvisoryConsumer( cms::Session* session ) {

    if( session == NULL ) {
        throw NullPointerException(
            __FILE__, __LINE__, "Session Object passed was Null." );
    }

    std::auto_ptr<cms::Topic> advisories( session->createTopic(
        "ActiveMQ.Advisory.TempTopic,ActiveMQ.Advisory.TempQueue" ) );

    this->session = session;
    this->consumer.reset( session->createConsumer( advisories.get() ) );
    this->consumer->setMessageListener( this );
}

////////////////////////////////////////////////////////////////////////////////
TempDestinationAdvisoryConsumer::~TempDestinationAdvisoryConsumer() {
}

////////////////////////////////////////////////////////////////////////////////
void TempDestinationAdvisoryConsumer::onMessage( const cms::Message* message ) {

    if( message->getCMSType() == "Advisory" ) {

        std::cout << "Received an Advisory Message!" << std::endl;

        const ActiveMQMessage* amqMessage =
            dynamic_cast<const ActiveMQMessage*>( message );

        if( amqMessage != NULL && amqMessage->getDataStructure() != NULL ) {
            std::cout << "Advisory Message contains a Command Object!" << std::endl;

            try {

                Pointer<DestinationInfo> info =
                    amqMessage->getDataStructure().dynamicCast<DestinationInfo>();

                unsigned char operationType = info->getOperationType();

                if( operationType == ActiveMQConstants::DESTINATION_REMOVE_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Removed."
                              << std::endl;
                } else if( operationType == ActiveMQConstants::DESTINATION_ADD_OPERATION ) {
                    std::cout << "Temporary Destination {"
                              << info->getDestination()->getPhysicalName()
                              << "} Added."
                              << std::endl;
                } else {
                    std::cout << "ERROR: I have no Idea what just happened!"
                              << std::endl;
                }

            } catch( ClassCastException& ex ) {
                std::cout << "ERROR: Expected the Command to be a DestinationInfo, "
                          << "it wasn't so PANIC!!"
                          << std::endl;
            }
        }

    } else {
        std::cout << "Received a Non-Advisory Message!" << std::endl;
    }
}

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛标识和 Apache ActiveMQ 项目标识是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据 Apache 许可证 2.0 许可。