CMS API 概述
什么是 CMS?
CMS API 是 Java 中 JMS API 的 C++ 对应物,用于在网络上或同一台机器上的客户端之间发送和接收消息。在 CMS 中,我们尽一切努力尽可能地保持与 JMS api 的一致性,只有在 JMS 功能严重依赖 Java 编程语言本身的功能时才有所偏离。尽管存在一些差异,但大多数差异都非常小,而且在大多数情况下,CMS 遵循 JMS 规范,因此牢牢掌握 JMS 的工作原理将使使用 CMS 变得更加容易。
如果您已经熟悉 JMS,那么首先需要查看 CMS 的 API 文档
CMS 入门
本节介绍使用 CMS API 的基础知识。为了便于讨论,我们在此假设您使用 ActiveMQ-CPP 连接到 ActiveMQ Broker,当然,使用 CMS,您也可以在您的 C++ 应用程序中链接到 CMS API 的另一个实现并连接到其他消息服务。
CMS ConnectionFactory
您在 CMS API 中通常使用的第一个接口是 ConnectionFactory。ConnectionFactory 允许您创建 CMS Connection 对象,这些对象与某些消息服务(例如 ActiveMQ 代理)保持连接。
获取 CMS ConnectionFactory 实例的最简单方法是使用所有 CMS 提供程序库都必须实现的静态方法 createCMSConnectionFactory。下面的代码片段演示了如何获取新的 ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
如您所见,createCMSConnectionFactory 接受一个字符串参数,该参数以 URI 的形式表示,该 URI 定义了创建的连接的连接位置以及应使用的协议,在本例中为 TCP/IP。此外,配置信息也可以编码在 URI 中。有关可以通过 URI 传递给 ActiveMQ-CPP 的配置参数的更多信息,请参阅 配置 页面。
创建 ConnectionFactory 后,下一步是使用 ConnectionFactory 创建 CMS Connection。Connection 是管理客户端与提供程序连接的对象。下一节将介绍 CMS Connection 的使用,创建连接的代码如下所示
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
创建后,Connection 对象尝试连接到 CMS 提供程序,如果连接失败,则会抛出 CMSException,其消息属性中存储了发生的错误的描述。
连接和身份验证
createConnection 方法有几个版本,允许您为新创建的连接指定登录参数。您最常使用的一个版本接受用户名和密码对,它们被传输到代理以进行身份验证。如果凭据无效,将抛出 CMSException。下面的示例代码展示了如何在创建 Connection 对象时传递用户名和密码。
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection( "<USERNAME>", "<PASSWORD>") );
如果您不想将值硬编码到源代码中或编写代码从其他地方读取登录数据,则还有另一种方法可以传递用户名和密码,即传递给 createConnectionFactory 的 URI 可以被编码,以便连接工厂在解析 URI 时从系统环境中读取这些值。下面的示例展示了如何使用 URI 中设置的登录数据创建连接工厂。
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616?username=${USERNAME}&password=${PASSWORD}" ) );
如您所见,让 URI 上的值来自系统环境非常简单。此方法适用于您可以在 URI 中指定的任何参数。
CMS Connection
CMS Connection 接口定义了一个对象,该对象是客户端与 CMS 提供程序的活动连接。在大多数情况下,客户端只会创建一个连接对象,因为它被认为是一个重量级对象。
连接有几个用途
- 它封装了与 JMS 提供程序的打开连接。它通常表示客户端和提供程序服务守护进程之间打开的 TCP/IP 套接字。
- 它的创建是客户端身份验证发生的地方。
- 它可以指定唯一的客户端标识符。
- 它提供 ConnectionMetaData 对象。
- 它支持可选的 ExceptionListener 对象。
CMS Connection 是从 CMS ConnectionFactory 创建的,正如我们之前所见。如果 ConnectionFactory 创建调用成功,则返回的 Connection 对象将连接到 CMS 提供程序。Connection 对象在停止状态下创建,在连接启动之前,不会将任何消息传递给客户端创建的消息使用者。通常情况下,将连接保持在停止状态,直到客户端创建了它打算使用的初始会话集、消息生产者和消息使用者集。客户端的设置阶段完成后,它应该调用 Connection 的 start 方法以开始从提供程序接收消息。未能调用 start 方法是 CMS 和 JMS 客户端新手非常常见的错误,如果您发现没有收到任何消息,首先要检查的是是否调用了 start。
创建 Connection 后,客户端必须创建 CMS Session 才能创建消息生产者和使用者。下面的代码片段将我们到目前为止看到的代码整合在一起,然后展示了如何从 Connection 实例创建 CMS Session 对象,接下来的部分将更详细地讨论 CMS Session。
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession() );
CMS Session
成功创建 CMS Connection 后,您通常会使用新的 Connectio 实例创建一个或多个 CMS Session 对象。Session 被定义为用于生产和使用消息的单线程上下文。
会话有几个用途
- 它是其消息生产者和使用者的工厂。
- 它提供提供程序优化的消息工厂。
- 它是临时主题和临时队列的工厂。
- 它为那些需要动态操作提供程序特定目标名称的客户端提供了一种创建队列或主题对象的方法。
- 它支持一系列跨其生产者和消费者工作的原子单元的单一事务。
- 它定义了它使用的消息和它产生的消息的串行顺序。
- 它保留它使用的消息,直到它们被确认。
- 它序列化注册到其消息消费者的消息监听器的执行。
- 它是 QueueBrowser 的工厂(尚未实现)。
一个会话可以创建和服务多个消息生产者和使用者。
当客户端创建 CMS 会话时,它必须指定会话确认接收和调度消息的模式。支持的模式总结在下表中。
确认模式 | 描述 |
---|---|
AUTO_ACKNOWLEDGE | 使用此确认模式,会话在从接收调用成功返回后或会话调用的消息监听器成功处理消息并返回后,会自动确认客户端已收到消息。 |
CLIENT_ACKNOWLEDGE | 使用此确认模式,客户端通过调用消息的 acknowledge 方法确认已使用消息。确认已使用消息会确认会话已使用的所有消息。使用客户端确认模式时,客户端可能会在尝试处理消息时累积大量未确认消息。CMS 提供程序应为管理员提供一种限制客户端过载的方法,以防止客户端在使用的一些资源暂时被阻塞时出现资源耗尽和随之而来的故障。 |
DUPS_OK_ACKNOWLEDGE | 此确认模式指示会话延迟确认消息的传递。如果 JMS 提供程序发生故障,这可能会导致传递一些重复消息,因此只有在使用者可以容忍重复消息时才应使用它。使用此模式可以通过最小化会话为防止重复而执行的工作来减少会话开销。 |
SESSION_TRANSACTED | 会话是事务性的,消息的确认由内部处理。 |
INDIVIDUAL_ACKNOWLEDGE | 确认仅应用于单个消息。与 CLIENT_ACKNOWLEDGE 不同的是,CLIENT_ACKNOWLEDGE 的确认适用于整个会话到目前为止接收的所有消息,而此模式仅适用于单个消息,允许客户端更具选择性地确认哪些消息。 |
在上一节中,我们展示了如何创建会话,让我们在这里再次看一下该示例
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession() );
在此代码片段中,会话是使用没有任何参数的 createSession 创建的,这会创建一个处于 AUTO_ACKNOWLEDGE 模式下的会话。要创建具有上述模式之一的会话,CMS Session 接口中有一个第二个创建方法,它接受一个指定模式的单个参数,让我们来看一个示例
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
std::auto_ptr<cms::Session> session( connection->createSession( cms::Session::CLIENT_ACKNOWLEDGE ) );
如您所见,这里没有太大区别,只需指定您想要的确认模式,您就可以创建会话资源,在接下来的几节中,我们将介绍您可以从会话中创建的对象类型,并向您展示使用它们的 基础知识。
从 CMS Session 创建的对象
在本节中,我们将介绍从 CMS Session 对象实例创建的对象类型。
CMS 消息
顾名思义,CMS 存在于发送和接收消息,因此从介绍您可以使用 CMS 发送和接收的消息开始很有意义。在撰写本文时,CMS 支持四种主要的 Message 类型,其他类型可能会在将来加入,但我们将重点介绍目前完全支持的类型。下表显示了消息类型及其使用简要说明,有关特定消息类型接口和用法的完整详细信息,请参阅 CMS API 文档。
消息类型 | 描述 |
---|---|
消息 | Message 接口定义了最简单的 CMS 消息。与其他 CMS 消息不同,Message 类型没有主体或有效负载,但它可以通过包含 C++ 中基本类型的属性设置器集合来设置属性。Message 接口是 CMS 中所有 Message 类型的根。 |
TextMessage | TextMessage 类承载一个有效负载,该有效负载包含一个 C++ 字符串。TextMessage 接口扩展了 Message 接口,添加了用于设置和获取有效负载文本的方法,并保留了对设置消息属性的支持。由于 Java 对象不能直接从 JMS 客户端发送到 CMS 客户端,因此文本消息是将对象序列化为 XML 并将其发送到 JMS 客户端的理想方式。 |
BytesMessage | BytesMessage 的有效负载包含一系列不间断的字节,接收者负责解释这些字节。BytesMessage 为标准的 Message 接口方法添加了用于获取和设置字节数组的方法。 |
MapMessage | Map 消息的有效负载是一组名称/值对。名称为 C++ 字符串类型,值为 C++ 原生类型或字符串。 |
StreamMessage | Stream 消息主体包含一系列自描述的原生类型。StreamMessage 接口提供访问器方法,可以读写消息的原生类型。读方法允许原生类型转换,如果转换不会导致数据丢失。 |
现在我们已经了解了可以创建的消息类型,让我们看看如何实际创建它们,并探索 Message 类中提供的一些操作。
创建消息
正如你可能已经猜到的,消息是使用我们之前创建的 CMS 会话实例创建的。会话提供了用于创建我们上面讨论的四种消息类型的方法。会话是创建 CMS 中定义的消息接口的提供程序实现的工厂,它知道如何配置内部数据结构并防止客户端直接绑定到提供程序实现,这就是为什么我们必须使用会话来创建消息对象而不是直接创建它们的原因。让我们看看一个创建 TextMessage 实例并在该消息上设置一些属性的代码片段。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a TextMessage
std::auto_ptr<cms::TextMessage> textMessage( session->createTextMessage() );
// Set the payload
textMessage->setText( "Payload Text" );
// Set some Properties
textMessage->setStringProperty( "USER_NAME", "Steve" );
textMessage->setIntProperty( "USER_CODE", 42 );
从上面的代码可以看出,创建 TextMessage 很像创建会话或连接实例,你只需在你的 CMS 会话实例中调用 **createTextMessage**,你就会得到一个新的 TextMessage 指针,然后你可以用文本和属性填充它。
CMS 目的地
顾名思义,CMS 目的地接口定义了一个对象,该对象表示消息被消息代理路由到的端点。客户端创建目的地并将消息发送到它们,或者等待接收在他们订阅的目的地上的消息。CMS 中有两种基本类型的目的地:主题和队列,它们有两种子类型:临时主题和临时队列。下表总结了四种不同的目的地类型。
目的地类型 | 描述 |
主题 | 在 CMS 中,主题实现了发布和订阅语义。当你发布消息时,消息会发送到所有感兴趣的订阅者 - 因此从 0 到多个订阅者将收到消息的副本。只有在代理收到消息时拥有活动订阅的订阅者才会收到消息的副本。 |
队列 | CMS 队列实现了负载均衡语义。一条消息将被正好一个消费者接收。如果在消息发送时没有可用的消费者,它将被保留,直到有可用的消费者可以处理该消息。如果消费者收到消息,并且在关闭之前没有确认消息,那么消息将被重新发送给另一个消费者。一个队列可以有许多消费者,消息在可用的消费者之间负载均衡。 |
临时主题 | 临时主题对象是一个独特的主题对象,为连接的持续时间创建。它是一个系统定义的主题,只能由创建它的连接使用。 |
临时队列 | 临时队列对象是一个独特的队列对象,为连接的持续时间创建。它是一个系统定义的队列,只能由创建它的连接使用。 |
现在我们已经了解了目的地类型,让我们看看一个代码片段,展示如何创建目的地对象。下面的示例展示了如何使用现在应该已经很熟悉的模式来创建主题实例。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
创建主题或队列需要为目的地传递一个名称,名称类似于地址,发送到“EXAMPLE-TOPIC”目的地的消息被订阅了相同目的地的客户端接收。
CMS 消息消费者
现在我们已经了解了如何创建消息和目的地,我们将看看如何创建 CMS 消息消费者。消息消费者允许客户端应用程序接收其他客户端发送到主题或队列的消息。在我们讨论使用消费者接收消息之前,让我们先看看如何创建消费者。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
// Now create the Consumer
std::auto_ptr<cms::MessageConsumer> myConsumer( session->createConsumer( myTopic ) );
如你所见,消息消费者是通过从 CMS 会话对象实例调用 **createConsumer** 来创建的。消息消费者在创建时被赋予一个目的地来监听消息。一旦你创建了一个消息消费者,它就该开始使用它来接收消息了,消息消费者有两种接收消息的方法,一种是同步的,另一种是异步的。
接收消息的同步方法涉及调用消费者的 receive 方法。如果未指定超时,则调用 receive 将阻塞,直到在相关目的地接收消息,或者如果给定的超时时间已过并且没有新消息到达,则返回 NULL。让我们看一下使用 CMS 的简单同步消息轮询循环的示例。
while( !done ) {
std::auto_ptr<Message> message( myConsumer->receive() );
...Do Something with the message...
}
如你所见,我们在上面的代码中调用了 MessageConsumer 的 **receive** 方法,期望它会在某个时间点返回一个新的 Message 对象。这里的代码将阻塞,直到收到新的消息。这种方法假设你的程序在收到消息之前不需要做任何其他事情,但是如果你的应用程序需要执行其他处理,则有一种替代方案。下面的代码示例展示了一个轮询循环,它使用 MessageConsumer 的 **receiveNoWait** 方法来轮询并立即返回,以便执行其他处理。
```while( !done ) {
std::auto_ptr
if( message.get() != NULL ) { …对消息进行处理… }
…在检查另一条消息之前执行其他应用程序逻辑… }
The asynchronous method involves implementing the CMS MessageListener interface and passing an instance of your implementation to the MessageConsumer's **setMessageListener** method. When a new message arrives your listener's **onMessage** method will be called by the consumer in the context of another thread to allow you to process the Message received. Below is a code snippet that demonstrates implementing the MessageListener interface.
class SimpleListener : public cms::MessageListener {
virtual void onMessage( const Message* message ) {
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
printf( "Message Received: %s\n", text.c_str() );
} }; ```
在上面的示例中,我们创建了一个名为 SimpleListener 的新类,当收到 TextMessage 时打印其内容,或者打印一条消息,表明它没有收到 TextMessage,因为它预期会收到。注意,**onMessage** 方法接收指向基本 Message 接口的指针,然后我们尝试动态转换为我们认为应该接收的类型。这允许你的代码在一个方法中处理多种消息类型。传递的指针由调用者或 onMessage 拥有,因此你不应该存储或删除它,如果你需要保留 Message 的副本,则必须通过调用 Message 的 **clone** 方法创建副本。
现在我们有了 MessageListener 实现,它可以使用时间来查看如何使用之前创建的 MessageConsumer 设置异步消费。
SimpleListener listener;
myConsumer->setMessageListener( &listener );
就是这样,现在我们将收到发送到我们之前在 SimpleListener 实例的 onMessage 方法中创建的目的地上的消息。
CMS 消息生产者
我们已经了解了如何消费消息,现在我们如何产生它们呢?答案是 CMS 消息生产者,它用于将消息发送到代理,以分发给在主题或队列上监听消息的各种客户端。创建 MessageProducer 与创建 MessageConsumer 非常相似,你首先创建你的连接、会话和目的地对象,然后使用会话创建 MessageProducer。下面的代码片段演示了如何创建 MessageProducer。
// Create the ConnectionFactory
std::auto_ptr<cms::ConnectionFactory> connectionFactory(
cms::ConnectionFactory::createCMSConnectionFactory( "tcp://127.0.0.1:61616" ) );
// Create a Connection
std::auto_ptr<cms::Connection> connection( connectionFactory->createConnection() );
// Create a new Session from our Connection
std::auto_ptr<cms::Session> session( connection->createSession() );
// Now create a Topic
std::auto_ptr<cms::Topic> myTopic( session->createTopic( "EXAMPLE-TOPIC" ) );
// Now create the Consumer
std::auto_ptr<cms::MessageProducer> myProducer( session->createProducer( myTopic ) );
完整示例
现在我们已经了解了 CMS API 的大部分基础知识,是时候看看一些完整的示例,这些示例演示了如何在自己的应用程序中使用 CMS API。第一个示例将展示如何创建一个简单的异步消费者,它可以从 ActiveMQ 代理接收 TextMessage 对象,然后在第二个示例中,我们将看看一个简单的生产者,它将 TextMessage 对象发布到我们的消费者正在监听的目的地。
简单的异步消费者
在简单的异步消费者示例中,我们将 CMS API 的使用封装在一个名为 SimpleAsyncConsumer 的类中。这个类公开了一个构造函数,允许用户创建类的实例,该实例连接到特定的代理和目的地,以及目的地是队列还是主题。用户还可以指定确认模式应该为 **CLIENT_ACKNOWLEDGE** 而不是默认的 **AUTO_ACKNOWLEDGE** 模式。
一旦创建了这个类的实例,用户就调用 *runConsumer* 方法开始监听指定的目的地。*runConsumer* 方法创建与代理的连接并启动一个新的会话,该会话使用配置的确认模式配置。一旦创建了会话,就可以创建一个新的消费者并将其附加到配置的目的地。由于我们希望异步监听新消息,因此 SimpleAsyncConsumer 继承自 cms::MessageListener,以便它可以将自己注册为创建于 *runConsumer* 中的 MessageConsumer 的消息监听器。
*runConsumer* 方法返回后,主方法等待用户输入以退出,应用程序运行期间收到的所有消息都将分派到 SimpleAsyncConsumer 的 onMessage 方法,如果消息是 TextMessage,则其内容将打印在屏幕上。
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
public MessageListener,
public DefaultTransportListener {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageConsumer* consumer;
bool useTopic;
std::string brokerURI;
std::string destURI;
bool clientAck;
private:
SimpleAsyncConsumer( const SimpleAsyncConsumer& );
SimpleAsyncConsumer& operator= ( const SimpleAsyncConsumer& );
public:
SimpleAsyncConsumer( const std::string& brokerURI,
const std::string& destURI,
bool useTopic = false,
bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
consumer(NULL),
useTopic(useTopic),
brokerURI(brokerURI),
destURI(destURI),
clientAck(clientAck) {
}
virtual ~SimpleAsyncConsumer() {
this->cleanup();
}
void close() {
this->cleanup();
}
void runConsumer() {
try {
// Create a ConnectionFactory
ActiveMQConnectionFactory* connectionFactory =
new ActiveMQConnectionFactory( brokerURI );
// Create a Connection
connection = connectionFactory->createConnection();
delete connectionFactory;
ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>( connection );
if( amqConnection != NULL ) {
amqConnection->addTransportListener( this );
}
connection->start();
connection->setExceptionListener(this);
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageConsumer from the Session to the Topic or Queue
consumer = session->createConsumer( destination );
consumer->setMessageListener( this );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// Called from the consumer since this class is a registered MessageListener.
virtual void onMessage( const Message* message ) {
static int count = 0;
try
{
count++;
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
string text = "";
if( textMessage != NULL ) {
text = textMessage->getText();
} else {
text = "NOT A TEXTMESSAGE!";
}
if( clientAck ) {
message->acknowledge();
}
printf( "Message #%d Received: %s\n", count, text.c_str() );
} catch (CMSException& e) {
e.printStackTrace();
}
}
// If something bad happens you see it here as this class is also been
// registered as an ExceptionListener with the connection.
virtual void onException( const CMSException& ex AMQCPP_UNUSED ) {
printf("CMS Exception occurred. Shutting down client.\n");
exit(1);
}
virtual void transportInterrupted() {
std::cout << "The Connection's Transport has been Interrupted." << std::endl;
}
virtual void transportResumed() {
std::cout << "The Connection's Transport has been Restored." << std::endl;
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete consumer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// https://activemq.apache.org/cms/
//
std::string brokerURI =
"failover:(tcp://127.0.0.1:61616)";
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the consumer listens, to have the consumer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the consumer.
//============================================================
bool useTopics = false;
//============================================================
// set to true if you want the consumer to use client ack mode
// instead of the default auto ack mode.
//============================================================
bool clientAck = false;
// Create the consumer
SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck );
// Start it up and it will listen forever.
consumer.runConsumer();
// Wait to exit.
std::cout << "Press 'q' to quit" << std::endl;
while( std::cin.get() != 'q') {}
// All CMS resources should be closed before the library is shutdown.
consumer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}
简单的生产者
与简单的异步消费者示例非常类似,简单的生产者示例将 CMS API 细节包装在一个名为 **SimpleProducer** 的类中,以创建生产者。这个类公开了一个与消费者示例类似的接口,有一个构造函数允许创建实例,其中包含代理和目的地的配置选项,以及要发送到配置目的地的消息数量。一旦创建,客户端代码只需调用 SimpleProducer 的 *run* 方法即可发布指定数量的消息。*run* 方法完成后,客户端可以自由关闭 SimpleProducer,这将清理分配的 CMS 资源,一旦关闭,应用程序就会退出。
简单的消息生产者示例
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Long.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <activemq/library/ActiveMQCPP.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>
#include <memory>
using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;
////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:
Connection* connection;
Session* session;
Destination* destination;
MessageProducer* producer;
bool useTopic;
bool clientAck;
unsigned int numMessages;
std::string brokerURI;
std::string destURI;
private:
SimpleProducer( const SimpleProducer& );
SimpleProducer& operator= ( const SimpleProducer& );
public:
SimpleProducer( const std::string& brokerURI, unsigned int numMessages,
const std::string& destURI, bool useTopic = false, bool clientAck = false ) :
connection(NULL),
session(NULL),
destination(NULL),
producer(NULL),
useTopic(useTopic),
clientAck(clientAck),
numMessages(numMessages),
brokerURI(brokerURI),
destURI(destURI) {
}
virtual ~SimpleProducer(){
cleanup();
}
void close() {
this->cleanup();
}
virtual void run() {
try {
// Create a ConnectionFactory
auto_ptr<ActiveMQConnectionFactory> connectionFactory(
new ActiveMQConnectionFactory( brokerURI ) );
// Create a Connection
try{
connection = connectionFactory->createConnection();
connection->start();
} catch( CMSException& e ) {
e.printStackTrace();
throw e;
}
// Create a Session
if( clientAck ) {
session = connection->createSession( Session::CLIENT_ACKNOWLEDGE );
} else {
session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
}
// Create the destination (Topic or Queue)
if( useTopic ) {
destination = session->createTopic( destURI );
} else {
destination = session->createQueue( destURI );
}
// Create a MessageProducer from the Session to the Topic or Queue
producer = session->createProducer( destination );
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
// Create the Thread Id String
string threadIdStr = Long::toString( Thread::currentThread()->getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( unsigned int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );
message->setIntProperty( "Integer", ix );
// Tell the producer to send the message
printf( "Sent message #%d from thread %s\n", ix+1, threadIdStr.c_str() );
producer->send( message );
delete message;
}
}catch ( CMSException& e ) {
e.printStackTrace();
}
}
private:
void cleanup(){
try {
if( connection != NULL ) {
connection->close();
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
delete destination;
delete producer;
delete session;
delete connection;
}
};
////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
activemq::library::ActiveMQCPP::initializeLibrary();
std::cout << "=====================================================\n";
std::cout << "Starting the example:" << std::endl;
std::cout << "-----------------------------------------------------\n";
// Set the URI to point to the IPAddress of your broker.
// add any optional params to the url to enable things like
// tightMarshalling or tcp logging etc. See the CMS web site for
// a full list of configuration options.
//
// https://activemq.apache.org/cms/
//
std::string brokerURI =
"failover://(tcp://127.0.0.1:61616)";
//============================================================
// Total number of messages for this producer to send.
//============================================================
unsigned int numMessages = 2000;
//============================================================
// This is the Destination Name and URI options. Use this to
// customize where the Producer produces, to have the producer
// use a topic or queue set the 'useTopics' flag.
//============================================================
std::string destURI = "TEST.FOO";
//============================================================
// set to true to use topics instead of queues
// Note in the code above that this causes createTopic or
// createQueue to be used in the producer.
//============================================================
bool useTopics = false;
// Create the producer and run it.
SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
// Publish the given number of Messages
producer.run();
// Before exiting we ensure that all CMS resources are closed.
producer.close();
std::cout << "-----------------------------------------------------\n";
std::cout << "Finished with the example." << std::endl;
std::cout << "=====================================================\n";
activemq::library::ActiveMQCPP::shutdownLibrary();
}