代码概述

开发者 > 代码概述

架构

以下部分将介绍 Apache ActiveMQ Classic 的主要部分,并链接到代码以帮助您了解布局。

JMS 客户端

The org.apache.activemq 包定义了核心 JMS 客户端。

传输

JMS 客户端和消息代理使用 Transport 抽象来发送命令对象(就像分布式的命令模式)。TransportChannel 通常处理某种网络机制(使用 BIO 的 TCP 套接字,使用 NIO,UDP / 多播,SSL over sockets,JXTA,EmberIO 等)。有关更多详细信息,请参见 org.apache.activemq.transport 包。

因此,TransportChannel 基本上负责发送和接收 Command 对象(每个实例代表某种命令)。Packet 在 org.apache.activemq.command 包中定义,该包定义了所有 JMS 消息实现类(它们是命令)以及许多其他类型的包,例如订阅,消息确认,事务等等。

线格式

有多种可能的将消息编码到流上的方法。我们可能希望适应各种不同的编码机制 - 例如,为与 C / JavaScript 交谈提供更简单的线格式,或者进行 C# 友好的编码。

因此,所有 Transport  实现都采用可插拔的 WireFormat 实现类 - 这是用于决定如何将命令写入 DataIn / DataOut 流或数据报的策略模式

因此,如果您希望提供自己的二进制线上传输协议,那么我们只需要一个您协议的 WireFormat 实现,然后我们就可以将其与任何传输一起使用(TCP BIO,NIO,JXTA 等)。

我们默认使用 OpenWireFormat,这是从 Java 代码使用最有效且最简单的格式 - 所以如果线路的两端都是 Java,那么强烈推荐它。虽然其他 WireFormats 也很受欢迎。

默认线格式

默认线格式写入一个字节,指示正在发送的命令类型(请参阅 CommandTypes 接口,该接口定义了每种命令类型的所有 int 常量)。

核心 JMS 消息类型每个都有一个唯一的字节 ID,用于

  • 消息
  • ObjectMessage
  • TextMessage
  • MapMessage
  • BytesMessage
  • StreamMessage

然后,除此之外还有各种其他 类型的命令,例如

还有一些其他的;org.apache.activemq.command 包以其恐怖的细节描述了它们。

基本上,DefaultWireFormat 对这些命令中的每一个都有一个默认编码。因此,在写入指示数据包类型的第一个字节之后,每个数据包类型都有一个特定的线格式。

对于新的线格式,您可能只需要支持这些类型中的一小部分。例如,您可能只拥有一个简单的发布消息,消费消息和消息确认。

消息代理

消息代理(JMS 客户端的服务器端)的 API 定义在 org.apache.activemq.broker 中。还有其他几个包定义了不同的部分,从消息存储到消息路由等等。

要查看这些包的概述,请尝试 JavaDocs


ActiveMQ Classic 系统概述

介绍

ActiveMQ Classic 是负责创建和管理用于客户端和代理之间通信的网络连接的系统。本文档希望概述此系统的内部工作原理,以便将来开发人员更容易理解。它将提供系统的高级概述,并概述主要参与者。我们还将介绍一些其他有趣的类,这些类可能对其他正在开发该系统的人员有用。本文档的大部分内容都是针对服务器端代码编写的。这是因为客户端通信系统在架构上很简单,并且与之相比,理解服务器将使理解客户端变得微不足道。

我们假设读者对 JMS 有基本了解。有关更多信息,请参阅官方 Java 文档。

概述:主要参与者

参与 ActiveMQ Classic 通信系统的核心类是 Transports。这些包括 TransportTransportServerTransportFactory 层次结构。 TransportTransportServer 分别是套接字和服务器套接字的包装器。 TransportFactory(正如您可能猜到的那样)是用于创建 TransportTransportServer 的工厂。 Transport 连接到 Broker 并传输 Command,它们代表 ActiveMQ Classic 要执行的所有主要操作(稍后将详细介绍)。以下示例说明了这些部分如何组合在一起。

创建 JMS“提供者”应用程序所需的主要类是 Broker 类。默认的 ActiveMQ Classic 二进制文件将使用 BrokerService 类来包装 Broker。当应用程序启动时,它将实例化一个 BrokerService 并指示它绑定到特定的(本地)地址,例如“tcp://127.0.0.1:61616”。 Broker 将使用给定地址中的方案,并找到合适的 TransportFactory,在本例中为 TcpTransportFactory。然后,此工厂将用于创建一个 TcpTransportServer,该服务器将绑定到“localhost:61616”。 TransportServer 启动后,它将不断轮询其套接字以查找传入连接。成功连接的传入套接字将被包装在 TcpTransport 实例中,并(间接地)传递回 Broker。然后,Broker 将开始轮询新的 Transport 以查找要处理的传入 Command

上面示例中缺少的最后部分是 TransportConnectionTransportConnector 类。这些类用于分别将 Broker 连接到 TransportTransportServer

类详细信息

本部分将分别解释上述类中的一些更有趣的细节。

Transports、TransportServers 和 TransportFactories

这些类如何工作的基本原理非常简单: TransportTransportServer 是用于隐藏实现的套接字和服务器套接字的包装器,TransportFactory 是用于上述类的工厂类。唯一的注意事项是如何根据提供给它们的 URI 选择和配置 TransportFactory

TransportFactory 类是抽象类,无法直接创建 TransportTransportServer 类。但是,它是用于创建 TransportTransportServer 的类。 TransportFactory 将其职责委托给其子类,这些子类由 FactoryFinder 类提供,该类使用 URI 的方案根据存储在 META-INF 目录下的文本文件查找匹配的工厂类。

创建的 Transport 的配置完全通过反射完成。 Transport 通过调用 compositeConfigure 来配置,该方法由工厂在创建 Transport 时进行调用。 compositeConfigure 使用 IntrospectionSupport 类来调用通过 URI 传递的参数的 setter。例如,使用 URI“ssl://127.0.0.1:61616/?needClientAuth=true”创建 Transport 将导致创建 SslTransport 对象,其 setNeedClientAuth 方法(如果存在)将在创建后立即使用 true 的值进行调用。 TransportServer 的操作方式类似。唯一的区别是调用 IntrospectionSupport 是从 TransportFactorydoBind 方法进行的。

命令

CommandBroker 内部通信的主要手段。每个 Command 代表要执行的操作。 Command 子类包括 ConnectionInfoKeepAliveInfoMessage,它们分别导致处理新连接,维护旧连接以及处理用户消息。这些类使用 Marshalers 从 Transport 反序列化。每当在套接字中找到新数据时,都会读取第一个字节以确定接收到的 Command 类型。然后选择适当的 Marshaller 来反序列化 Command(例如,要反序列化 ConnectionInfo,将使用 ConnectionInfoMarshaller)。

TransportConnections 和 TransportConnectors

每个 TransportServer 都通过 TransportConnector 连接到一个 Broker。服务器的接受监听器(当构建新的 Transport 时调用)被设置为使用新的 Transport 调用给定的 TransportConnectorcreateConnection 方法。当被调用时,createConnection 会创建一个新的 TransportConnection,将给定的 Transport 和支持的 Broker 连接在一起;Transport 的传输监听器被设置为 TransportConnectiononCommand 方法,该方法在接收到新的 Command 时会被调用。

CommandAbstractConnectionTransportConnection 的超类)构成了一个访问者模式。onCommand 会调用 AbstractConnection 的服务方法,该方法将根据访问者模式进行一系列调用,最终将正确的 Command 子类传递给 Broker 的相应方法进行处理。

BrokerFilters 和 BrokerPlugins

虽然 BrokerFilterBrokerPlugin 没有被通信系统直接使用,但它们提供了一种有效且易于使用的方式来修改 Broker 行为。 BrokerFilter 允许修改一些 Broker 方法,而无需触及其他方法(顾名思义)。BrokerFilter 将其所有职责传递给它在构造函数中接收到的 Broker。对 BrokerFilter 进行子类化,允许我们在将工作传递给底层 Broker 之前执行额外的操作。

BrokerFilter 类的强大之处在于,可以级联多个过滤器来创建不同的功能组合。例如,JaasAuthenticationBrokerBrokerFilter 的一个子类,它修改了用于添加和删除连接的方法,以允许 JAAS 身份验证。AuthorizationBrokerBrokerFilter 的另一个子类。该类修改了目标监管方法以执行访问级别。借助这种架构,可以创建一个 JaasAuthenticationBroker,并让它使用一个 AuthorizationBroker 作为其底层代理(该代理本身会使用另一个代理等等)。

BrokerPlugin 是简单的类,它们会将其对应的 Brokers 包装在它们被赋予的那个 Broker 周围。例如,在现有 Broker 上“安装” AuthorizationPlugin 会创建一个 AuthorizationBroker,该代理在内部使用原始的 BrokerBrokerPlugin 存在的首要原因是允许人们配置 BrokerService 类使用的 Broker(通过代码或 XML 配置和 Spring)。

Apache, ActiveMQ, Apache ActiveMQ、Apache 羽毛标志和 Apache ActiveMQ 项目标志是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据 Apache 许可证 2.0 许可。