代码概述
架构
以下部分将介绍 Apache ActiveMQ Classic 的主要部分,并链接到代码以帮助您了解布局。
JMS 客户端
The org.apache.activemq 包定义了核心 JMS 客户端。
传输
JMS 客户端和消息代理使用 Transport 抽象来发送命令对象(就像分布式的命令模式)。TransportChannel 通常处理某种网络机制(使用 BIO 的 TCP 套接字,使用 NIO,UDP / 多播,SSL over sockets,JXTA,EmberIO 等)。有关更多详细信息,请参见 org.apache.activemq.transport 包。
因此,TransportChannel 基本上负责发送和接收 Command 对象(每个实例代表某种命令)。Packet 在 org.apache.activemq.command 包中定义,该包定义了所有 JMS 消息实现类(它们是命令)以及许多其他类型的包,例如订阅,消息确认,事务等等。
线格式
有多种可能的将消息编码到流上的方法。我们可能希望适应各种不同的编码机制 - 例如,为与 C / JavaScript 交谈提供更简单的线格式,或者进行 C# 友好的编码。
因此,所有 Transport 实现都采用可插拔的 WireFormat 实现类 - 这是用于决定如何将命令写入 DataIn / DataOut 流或数据报的策略模式。
因此,如果您希望提供自己的二进制线上传输协议,那么我们只需要一个您协议的 WireFormat 实现,然后我们就可以将其与任何传输一起使用(TCP BIO,NIO,JXTA 等)。
我们默认使用 OpenWireFormat,这是从 Java 代码使用最有效且最简单的格式 - 所以如果线路的两端都是 Java,那么强烈推荐它。虽然其他 WireFormats 也很受欢迎。
默认线格式
默认线格式写入一个字节,指示正在发送的命令类型(请参阅 CommandTypes 接口,该接口定义了每种命令类型的所有 int 常量)。
核心 JMS 消息类型每个都有一个唯一的字节 ID,用于
- 消息
- ObjectMessage
- TextMessage
- MapMessage
- BytesMessage
- StreamMessage
然后,除此之外还有各种其他 类型的命令,例如
- ConnectionInfo,用于建立与消息代理的新连接
- ConsumerInfo,当在连接上创建新的消费者时
- MessageAck,用于确认消息 ID
- TransactionInfo,用于表示事务
还有一些其他的;org.apache.activemq.command 包以其恐怖的细节描述了它们。
基本上,DefaultWireFormat 对这些命令中的每一个都有一个默认编码。因此,在写入指示数据包类型的第一个字节之后,每个数据包类型都有一个特定的线格式。
对于新的线格式,您可能只需要支持这些类型中的一小部分。例如,您可能只拥有一个简单的发布消息,消费消息和消息确认。
消息代理
消息代理(JMS 客户端的服务器端)的 API 定义在 org.apache.activemq.broker 中。还有其他几个包定义了不同的部分,从消息存储到消息路由等等。
要查看这些包的概述,请尝试 JavaDocs
ActiveMQ Classic 系统概述
介绍
ActiveMQ Classic 是负责创建和管理用于客户端和代理之间通信的网络连接的系统。本文档希望概述此系统的内部工作原理,以便将来开发人员更容易理解。它将提供系统的高级概述,并概述主要参与者。我们还将介绍一些其他有趣的类,这些类可能对其他正在开发该系统的人员有用。本文档的大部分内容都是针对服务器端代码编写的。这是因为客户端通信系统在架构上很简单,并且与之相比,理解服务器将使理解客户端变得微不足道。
我们假设读者对 JMS 有基本了解。有关更多信息,请参阅官方 Java 文档。
概述:主要参与者
参与 ActiveMQ Classic 通信系统的核心类是 Transports。这些包括 Transport
,TransportServer
和 TransportFactory
层次结构。 Transport
和 TransportServer
分别是套接字和服务器套接字的包装器。 TransportFactory
(正如您可能猜到的那样)是用于创建 Transport
和 TransportServer
的工厂。 Transport
连接到 Broker
并传输 Command
,它们代表 ActiveMQ Classic 要执行的所有主要操作(稍后将详细介绍)。以下示例说明了这些部分如何组合在一起。
创建 JMS“提供者”应用程序所需的主要类是 Broker
类。默认的 ActiveMQ Classic 二进制文件将使用 BrokerService
类来包装 Broker
。当应用程序启动时,它将实例化一个 BrokerService
并指示它绑定到特定的(本地)地址,例如“tcp://127.0.0.1:61616”。 Broker
将使用给定地址中的方案,并找到合适的 TransportFactory
,在本例中为 TcpTransportFactory
。然后,此工厂将用于创建一个 TcpTransportServer
,该服务器将绑定到“localhost:61616”。 TransportServer
启动后,它将不断轮询其套接字以查找传入连接。成功连接的传入套接字将被包装在 TcpTransport
实例中,并(间接地)传递回 Broker
。然后,Broker
将开始轮询新的 Transport
以查找要处理的传入 Command
。
上面示例中缺少的最后部分是 TransportConnection
和 TransportConnector
类。这些类用于分别将 Broker
连接到 Transport
和 TransportServer
。
类详细信息
本部分将分别解释上述类中的一些更有趣的细节。
Transports、TransportServers 和 TransportFactories
这些类如何工作的基本原理非常简单: Transport
和 TransportServer
是用于隐藏实现的套接字和服务器套接字的包装器,TransportFactory
是用于上述类的工厂类。唯一的注意事项是如何根据提供给它们的 URI 选择和配置 TransportFactory
。
TransportFactory
类是抽象类,无法直接创建 Transport
或 TransportServer
类。但是,它是用于创建 Transport
和 TransportServer
的类。 TransportFactory
将其职责委托给其子类,这些子类由 FactoryFinder
类提供,该类使用 URI 的方案根据存储在 META-INF 目录下的文本文件查找匹配的工厂类。
创建的 Transport
的配置完全通过反射完成。 Transport
通过调用 compositeConfigure
来配置,该方法由工厂在创建 Transport
时进行调用。 compositeConfigure
使用 IntrospectionSupport
类来调用通过 URI 传递的参数的 setter。例如,使用 URI“ssl://127.0.0.1:61616/?needClientAuth=true”创建 Transport
将导致创建 SslTransport
对象,其 setNeedClientAuth
方法(如果存在)将在创建后立即使用 true
的值进行调用。 TransportServer
的操作方式类似。唯一的区别是调用 IntrospectionSupport
是从 TransportFactory
的 doBind
方法进行的。
命令
Command
是 Broker
内部通信的主要手段。每个 Command
代表要执行的操作。 Command
子类包括 ConnectionInfo
,KeepAliveInfo
和 Message
,它们分别导致处理新连接,维护旧连接以及处理用户消息。这些类使用 Marshalers 从 Transport
反序列化。每当在套接字中找到新数据时,都会读取第一个字节以确定接收到的 Command
类型。然后选择适当的 Marshaller 来反序列化 Command
(例如,要反序列化 ConnectionInfo
,将使用 ConnectionInfoMarshaller
)。
TransportConnections 和 TransportConnectors
每个 TransportServer
都通过 TransportConnector
连接到一个 Broker
。服务器的接受监听器(当构建新的 Transport
时调用)被设置为使用新的 Transport
调用给定的 TransportConnector
的 createConnection
方法。当被调用时,createConnection
会创建一个新的 TransportConnection
,将给定的 Transport
和支持的 Broker
连接在一起;Transport
的传输监听器被设置为 TransportConnection
的 onCommand
方法,该方法在接收到新的 Command
时会被调用。
Command
和 AbstractConnection
(TransportConnection
的超类)构成了一个访问者模式。onCommand
会调用 AbstractConnection
的服务方法,该方法将根据访问者模式进行一系列调用,最终将正确的 Command
子类传递给 Broker
的相应方法进行处理。
BrokerFilters 和 BrokerPlugins
虽然 BrokerFilter
和 BrokerPlugin
没有被通信系统直接使用,但它们提供了一种有效且易于使用的方式来修改 Broker 行为。 BrokerFilter
允许修改一些 Broker
方法,而无需触及其他方法(顾名思义)。BrokerFilter
将其所有职责传递给它在构造函数中接收到的 Broker
。对 BrokerFilter
进行子类化,允许我们在将工作传递给底层 Broker
之前执行额外的操作。
BrokerFilter
类的强大之处在于,可以级联多个过滤器来创建不同的功能组合。例如,JaasAuthenticationBroker
是 BrokerFilter
的一个子类,它修改了用于添加和删除连接的方法,以允许 JAAS 身份验证。AuthorizationBroker
是 BrokerFilter
的另一个子类。该类修改了目标监管方法以执行访问级别。借助这种架构,可以创建一个 JaasAuthenticationBroker
,并让它使用一个 AuthorizationBroker
作为其底层代理(该代理本身会使用另一个代理等等)。
BrokerPlugin
是简单的类,它们会将其对应的 Brokers
包装在它们被赋予的那个 Broker
周围。例如,在现有 Broker
上“安装” AuthorizationPlugin
会创建一个 AuthorizationBroker
,该代理在内部使用原始的 Broker
。BrokerPlugin
存在的首要原因是允许人们配置 BrokerService
类使用的 Broker
(通过代码或 XML 配置和 Spring)。