Apache ActiveMQ Artemis 核心是一个具有其自身 API 的消息传递系统。我们称之为核心 API

如果您不想使用 JMS API 或任何其他支持的协议,您可以直接使用核心 API。核心 API 提供了 JMS 的所有功能,但没有那么多复杂性。它还提供了使用 JMS 不可用的功能。

1. 核心消息传递概念

一些核心消息传递概念类似于 JMS 概念,但核心消息传递概念在某些方面也存在差异。总的来说,核心 API 比 JMS API 更简单,因为我们消除了队列、主题和订阅之间的区别。我们将依次讨论每个主要的核心消息传递概念,但要详细了解 API,请参阅 Javadoc。

另请参阅 地址模型 章节,了解这些概念的概述以及配置详细信息。

1.1. 消息

  • 消息是客户端和服务器之间发送的数据单元。

  • 消息有一个主体,它是一个包含方便的方法的缓冲区,用于读取和写入数据到它。

  • 消息具有一组属性,这些属性是键值对。每个属性键都是一个字符串,属性值可以是整数、长整数、短整数、字节、字节数组、字符串、双精度浮点数、单精度浮点数或布尔值类型。

  • 消息有一个它要发送到的地址。当消息到达服务器时,它将被路由到绑定到该地址的任何队列。路由语义(即 anycast 或 multicast)由地址和队列的“路由类型”确定。如果队列绑定了任何过滤器,则消息将仅在过滤器匹配的情况下才会路由到该队列。一个地址可能绑定了多个队列,甚至可能没有绑定任何队列。除了队列之外,还可能绑定到地址的其他实体(例如divert)。

  • 消息可以是持久性的,也可以是非持久性的。持久队列中的持久消息将在服务器崩溃或重启后保留。非持久性消息永远不会在服务器崩溃或重启后保留。

  • 消息可以指定一个介于 0 到 9 之间的优先级值。0 表示最低优先级,9 表示最高优先级。代理将尝试在低优先级消息之前传递高优先级消息。

  • 消息可以指定一个可选的过期时间。代理不会在消息过期时间超过后传递消息。

  • 消息还有一个可选的时间戳,它表示消息发送的时间。

  • Apache ActiveMQ Artemis 还支持发送/使用比任何单个可用 RAM 中所能容纳的更大的消息。

2. 核心 API

2.1. ServerLocator

客户端使用ServerLocator 实例来创建ClientSessionFactory 实例。ServerLocator 实例用于定位服务器并创建到服务器的连接。

在 JMS 术语中,将ServerLocator 视为 JMS Connection Factory 一样。

ServerLocator 实例是使用ActiveMQClient 工厂类创建的。

2.2. ClientSessionFactory

客户端使用ClientSessionFactory 实例来创建ClientSession 实例。ClientSessionFactory 实例基本上是到服务器的连接。

在 JMS 术语中,将它们视为 JMS 连接。

ClientSessionFactory 实例是使用ServerLocator 类创建的。

2.3. ClientSession

客户端使用ClientSession 来使用和生成消息以及将它们分组到事务中。ClientSession 实例可以支持事务性和非事务性语义,还可以提供XAResource 接口,以便消息操作可以作为 JTA 事务的一部分执行。

ClientSession 实例对ClientConsumer 实例和ClientProducer 实例进行分组。

ClientSession 实例可以注册到一个可选的SendAcknowledgementHandler。这允许您的客户端代码在发送的消息成功到达服务器时异步收到通知。此独特的 Apache ActiveMQ Artemis 功能允许您完全保证发送的消息已到达服务器,而无需在每个发送的消息上阻塞,直到收到响应。在每个发送的消息上阻塞成本很高,因为它需要为每个发送的消息进行网络往返。通过不阻塞并异步接收发送确认,您可以创建真正的端到端异步系统,这使用标准 JMS API 是不可能的。有关此高级功能的更多信息,请参阅部分 发送和提交的保证

2.3.1. 为管理和调试识别您的客户端应用程序

为您的核心会话分配 ID 可以帮助您使用 管理控制台 进行监控和调试,例如

ServerLocator locator = ...
ClientSessionFactory csf = createSessionFactory(locator);
ClientSession session = csf.createSession(null, null, false, true, true, locator.isPreAcknowledge(), locator.getAckBatchSize(), "my-client-id");

然后值my-client-id 将出现在连接使用者生产者选项卡下的客户端 ID 列中。

如果您使用的是 JMS API,则setClientID 会为您提供相同的效果。

2.4. ClientConsumer

客户端使用ClientConsumer 实例从队列中使用消息。核心消息传递支持同步和异步消息使用语义。ClientConsumer 实例可以配置一个可选的过滤器表达式,并且只会使用与该表达式匹配的消息。

2.5. ClientProducer

客户端在ClientSession 实例上创建ClientProducer 实例,以便他们可以发送消息。ClientProducer 实例可以指定一个所有发送的消息都将被路由到的地址,或者它们可以没有指定的地址,而地址在发送时为消息指定。

请注意,ClientSessionClientProducerClientConsumer 实例是旨在重复使用的。

为每个您生成或使用的消息创建新的ClientSessionClientProducerClientConsumer 实例是一种反模式。如果您这样做,您的应用程序的性能将非常差。这在性能调整部分中进行了更详细的讨论 性能调整

3. 使用核心的一个简单示例

这是一个使用核心消息传递 API 发送和接收消息的非常简单的程序。在逻辑上,它包含两个部分:首先设置生产者将消息写入地址,其次,使用 anycast 路由为使用者创建队列,创建使用者并启动它。

ServerLocator locator = ActiveMQClient.createServerLocator("vm://0");

// In this simple example, we just use one session for both producing and receiving

ClientSessionFactory factory =  locator.createSessionFactory();
ClientSession session = factory.createSession();

// A producer is associated with an address ...

ClientProducer producer = session.createProducer("example");
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("Hello");

// We need a queue attached to the address ...

session.createQueue("example", RoutingType.ANYCAST, "example", true);

// And a consumer attached to the queue ...

ClientConsumer consumer = session.createConsumer("example");

// Once we have a queue, we can send the message ...

producer.send(message);

// We need to start the session before we can -receive- messages ...

session.start();
ClientMessage msgReceived = consumer.receive();

System.out.println("message = " + msgReceived.getBodyBuffer().readString());

session.close();