代码概述
架构
以下部分将介绍 Apache ActiveMQ Classic 的主要部分,并链接到代码以帮助您了解布局。

JMS 客户端
The org.apache.activemq 包定义了核心 JMS 客户端。
传输
JMS 客户端和消息代理使用 Transport 抽象来发送命令对象(就像分布式的命令模式)。TransportChannel 通常处理某种网络机制(使用 BIO 的 TCP 套接字,使用 NIO,UDP / 多播,SSL over sockets,JXTA,EmberIO 等)。有关更多详细信息,请参见 org.apache.activemq.transport 包。
因此,TransportChannel 基本上负责发送和接收 Command 对象(每个实例代表某种命令)。Packet 在 org.apache.activemq.command 包中定义,该包定义了所有 JMS 消息实现类(它们是命令)以及许多其他类型的包,例如订阅,消息确认,事务等等。
线格式
有多种可能的将消息编码到流上的方法。我们可能希望适应各种不同的编码机制 - 例如,为与 C / JavaScript 交谈提供更简单的线格式,或者进行 C# 友好的编码。
因此,所有 Transport 实现都采用可插拔的 WireFormat 实现类 - 这是用于决定如何将命令写入 DataIn / DataOut 流或数据报的策略模式。
因此,如果您希望提供自己的二进制线上传输协议,那么我们只需要一个您协议的 WireFormat 实现,然后我们就可以将其与任何传输一起使用(TCP BIO,NIO,JXTA 等)。
我们默认使用 OpenWireFormat,这是从 Java 代码使用最有效且最简单的格式 - 所以如果线路的两端都是 Java,那么强烈推荐它。虽然其他 WireFormats 也很受欢迎。
默认线格式
默认线格式写入一个字节,指示正在发送的命令类型(请参阅 CommandTypes 接口,该接口定义了每种命令类型的所有 int 常量)。
核心 JMS 消息类型每个都有一个唯一的字节 ID,用于
- 消息
- ObjectMessage
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
然后,除此之外还有各种其他 类型的命令,例如
- ConnectionInfo,用于建立与消息代理的新连接
- ConsumerInfo,当在连接上创建新的消费者时
- MessageAck,用于确认消息 ID
- TransactionInfo,用于表示事务
还有一些其他的;org.apache.activemq.command 包以其恐怖的细节描述了它们。
基本上,DefaultWireFormat 对这些命令中的每一个都有一个默认编码。因此,在写入指示数据包类型的第一个字节之后,每个数据包类型都有一个特定的线格式。
对于新的线格式,您可能只需要支持这些类型中的一小部分。例如,您可能只拥有一个简单的发布消息,消费消息和消息确认。
消息代理
消息代理(JMS 客户端的服务器端)的 API 定义在 org.apache.activemq.broker 中。还有其他几个包定义了不同的部分,从消息存储到消息路由等等。
要查看这些包的概述,请尝试 JavaDocs
ActiveMQ Classic 系统概述
介绍
ActiveMQ Classic 是负责创建和管理用于客户端和代理之间通信的网络连接的系统。本文档希望概述此系统的内部工作原理,以便将来开发人员更容易理解。它将提供系统的高级概述,并概述主要参与者。我们还将介绍一些其他有趣的类,这些类可能对其他正在开发该系统的人员有用。本文档的大部分内容都是针对服务器端代码编写的。这是因为客户端通信系统在架构上很简单,并且与之相比,理解服务器将使理解客户端变得微不足道。
我们假设读者对 JMS 有基本了解。有关更多信息,请参阅官方 Java 文档。
概述:主要参与者
参与 ActiveMQ Classic 通信系统的核心类是 Transports。这些包括 Transport,TransportServer 和 TransportFactory 层次结构。 Transport 和 TransportServer 分别是套接字和服务器套接字的包装器。 TransportFactory(正如您可能猜到的那样)是用于创建 Transport 和 TransportServer 的工厂。 Transport 连接到 Broker 并传输 Command,它们代表 ActiveMQ Classic 要执行的所有主要操作(稍后将详细介绍)。以下示例说明了这些部分如何组合在一起。
创建 JMS“提供者”应用程序所需的主要类是 Broker 类。默认的 ActiveMQ Classic 二进制文件将使用 BrokerService 类来包装 Broker。当应用程序启动时,它将实例化一个 BrokerService 并指示它绑定到特定的(本地)地址,例如“tcp://:61616”。 Broker 将使用给定地址中的方案,并找到合适的 TransportFactory,在本例中为 TcpTransportFactory。然后,此工厂将用于创建一个 TcpTransportServer,该服务器将绑定到“localhost:61616”。 TransportServer 启动后,它将不断轮询其套接字以查找传入连接。成功连接的传入套接字将被包装在 TcpTransport 实例中,并(间接地)传递回 Broker。然后,Broker 将开始轮询新的 Transport 以查找要处理的传入 Command。
上面示例中缺少的最后部分是 TransportConnection 和 TransportConnector 类。这些类用于分别将 Broker 连接到 Transport 和 TransportServer。
类详细信息
本部分将分别解释上述类中的一些更有趣的细节。
Transports、TransportServers 和 TransportFactories
这些类如何工作的基本原理非常简单: Transport 和 TransportServer 是用于隐藏实现的套接字和服务器套接字的包装器,TransportFactory 是用于上述类的工厂类。唯一的注意事项是如何根据提供给它们的 URI 选择和配置 TransportFactory。
TransportFactory 类是抽象类,无法直接创建 Transport 或 TransportServer 类。但是,它是用于创建 Transport 和 TransportServer 的类。 TransportFactory 将其职责委托给其子类,这些子类由 FactoryFinder 类提供,该类使用 URI 的方案根据存储在 META-INF 目录下的文本文件查找匹配的工厂类。
创建的 Transport 的配置完全通过反射完成。 Transport 通过调用 compositeConfigure 来配置,该方法由工厂在创建 Transport 时进行调用。 compositeConfigure 使用 IntrospectionSupport 类来调用通过 URI 传递的参数的 setter。例如,使用 URI“ssl://:61616/?needClientAuth=true”创建 Transport 将导致创建 SslTransport 对象,其 setNeedClientAuth 方法(如果存在)将在创建后立即使用 true 的值进行调用。 TransportServer 的操作方式类似。唯一的区别是调用 IntrospectionSupport 是从 TransportFactory 的 doBind 方法进行的。
命令
Command 是 Broker 内部通信的主要手段。每个 Command 代表要执行的操作。 Command 子类包括 ConnectionInfo,KeepAliveInfo 和 Message,它们分别导致处理新连接,维护旧连接以及处理用户消息。这些类使用 Marshalers 从 Transport 反序列化。每当在套接字中找到新数据时,都会读取第一个字节以确定接收到的 Command 类型。然后选择适当的 Marshaller 来反序列化 Command(例如,要反序列化 ConnectionInfo,将使用 ConnectionInfoMarshaller)。
TransportConnections 和 TransportConnectors
每个 TransportServer 都通过 TransportConnector 连接到一个 Broker。服务器的接受监听器(当构建新的 Transport 时调用)被设置为使用新的 Transport 调用给定的 TransportConnector 的 createConnection 方法。当被调用时,createConnection 会创建一个新的 TransportConnection,将给定的 Transport 和支持的 Broker 连接在一起;Transport 的传输监听器被设置为 TransportConnection 的 onCommand 方法,该方法在接收到新的 Command 时会被调用。
Command 和 AbstractConnection(TransportConnection 的超类)构成了一个访问者模式。onCommand 会调用 AbstractConnection 的服务方法,该方法将根据访问者模式进行一系列调用,最终将正确的 Command 子类传递给 Broker 的相应方法进行处理。
BrokerFilters 和 BrokerPlugins
虽然 BrokerFilter 和 BrokerPlugin 没有被通信系统直接使用,但它们提供了一种有效且易于使用的方式来修改 Broker 行为。 BrokerFilter 允许修改一些 Broker 方法,而无需触及其他方法(顾名思义)。BrokerFilter 将其所有职责传递给它在构造函数中接收到的 Broker。对 BrokerFilter 进行子类化,允许我们在将工作传递给底层 Broker 之前执行额外的操作。
BrokerFilter 类的强大之处在于,可以级联多个过滤器来创建不同的功能组合。例如,JaasAuthenticationBroker 是 BrokerFilter 的一个子类,它修改了用于添加和删除连接的方法,以允许 JAAS 身份验证。AuthorizationBroker 是 BrokerFilter 的另一个子类。该类修改了目标监管方法以执行访问级别。借助这种架构,可以创建一个 JaasAuthenticationBroker,并让它使用一个 AuthorizationBroker 作为其底层代理(该代理本身会使用另一个代理等等)。
BrokerPlugin 是简单的类,它们会将其对应的 Brokers 包装在它们被赋予的那个 Broker 周围。例如,在现有 Broker 上“安装” AuthorizationPlugin 会创建一个 AuthorizationBroker,该代理在内部使用原始的 Broker。BrokerPlugin 存在的首要原因是允许人们配置 BrokerService 类使用的 Broker(通过代码或 XML 配置和 Spring)。
