OpenWire CPP 客户端
连接性 > 跨语言客户端 > ActiveMQ Classic C++ 客户端 > OpenWire CPP 客户端
OpenWire C++ 客户端
目标
我们希望能够为 ActiveMQ Classic 提供一个 C++ API,该 API 保留与 Java API 的大致功能等效性,同时通过开放事件系统的较低级别,为应用程序设计提供更大的灵活性。我们已经设计出一个 API/编程模型,最大限度地提高消息传递库适应使用它的应用程序的方式,而不是相反。具体来说,我们的目标是
- 不对包含应用程序的线程约束做出任何假设(具体来说,允许在单线程应用程序中异步接收)
- 不对包含应用程序的事件循环特性做出任何假设
- 不对所需的库做出任何假设 - 也就是说,不要强制使用特定的库或智能指针实现
- 在灵活性与应用程序复杂性之间提供折衷方案
- 为异步事件通知提供明显 C++ 友好的 API
- 提供可插拔的数据传输
- 最大限度地提高可移植性
它不对包含应用程序的线程或事件循环模型做出任何假设,因此不会对 C++ 程序设计施加任何限制,因为异步消息传递固有的成本。由于这种灵活和可插拔的设计,这个库特别容易作为高层脚本语言接口到 ActiveMQ Classic 的本机层来实现,因为它对语言的线程功能没有假设。
当我们使用“编程模型”这个术语时,我们指的是处理异步消息传递 API 最复杂的部分 - 将消息传递到应用程序。
JMS/Java 编程模型
JMS 1.1 为应用程序提供两种接收消息的方法。一种是在 MsgConsumer 类中的 receive() 函数中进行阻塞接收。这将挂起调用线程,直到收到一条消息。另一种方法是通过 MessageListener 子类中的回调。当在创建 MessageListener 的主题或队列上收到一条消息时,会调用 OnMessage 函数。
总而言之 - JMS 模型对应用程序开发人员施加了线程结构。他们要么必须使用多个线程,每个线程都执行同步 receive() 调用,要么依赖一个单独的线程来将消息传递给他们。单线程模型是不可能的。
C++ 和 Java API 最适合的地方出现分歧
JMS API 对应用程序强制实施了特定的线程模型。这在 Java 编程空间中很好,并且被广泛接受,因为 Java 的设计是为了让多路复用通信与线程变得最容易。线程在 Java 中非常容易且得到很好的支持。
但是,在 C++ 中,常见的做法不同。大多数处理以事件驱动方式进行网络通信的 C++ 程序使用事件循环。这是一个线程,它等待多个文件描述符上的活动,依赖操作系统在其中一个有数据时唤醒它。文件描述符是事件驱动 C 和 C++ 程序的通用语言,因为操作系统使得使用它们来执行任何类型的事件变得很容易,并且还提供了丰富的功能来同时等待多个文件描述符。在 C 和 C++ 中,线程容易出错(因为语言中没有并发特性),并且当使用足够灵活的事件循环时,通常是不必要的。由此产生的结果是,异步消息传递系统最优化的 C+ API 是一个在 C++ 程序的这种常见做法中工作的 API,而不是施加任意的限制。特别是,即使是与应用程序没有共享任何数据或代码的后台线程也会影响它,因为多个线程和 UNIX 信号的交互没有明确定义。
总之,C++ 开发人员在使用可以传递事件的库时期望一定程度的开放性和灵活性,以便他们能够将多个事件源集成在一起,而无需诉诸多线程。大多数正确执行此操作的库将网络连接的底层文件描述符直接公开给应用程序 - 例如 X11 就是这样做的 - 我们将从实际的设计规范开始。
C++ 的更低级特性比 Java 呈现出更多固有的设计决策。特别是,最大的问题之一是,没有标准的智能指针实现。Boost 很流行,但每个企业都有自己的,每个开发人员都有个人偏好。出于这个原因,这个库不建议使用任何特定的智能指针实现,因为它对企业强加了设计决策,而这些企业在该领域有自己的要求。通过谨慎使用引用和 API 语义,我们可以使内存所有权足够明确,以避免陷阱。实际上,API 策略是,所有返回的指针都由应用程序负责释放。这应该使它足够明确。
我们提议采用三层方法来提供应用程序简单性和灵活性之间的折衷方案。最低层将做最少的事情,并允许最广泛的应用程序使用它,下一层将更具功能性,但有一些设计权衡,最高层将留给企业特定的需求。
建议的 C++ API 设计
我们提议的设计包含两层。核心库不拥有任何线程,只充当过滤器 - 从代理获取数据,并在必要时调度消息。该库的核心层被公开,以便那些不想使用除自己的线程以外的任何线程来管理事件,或者想要使用非 TCP 方法与代理进行通信的 C++ 开发人员可以使用它。与 ActiveMQ Classic 代理的套接字将由传输层公开给应用程序,并且在该套接字上接收的数据将由应用程序不透明地传递给核心库。这允许使用单线程程序进行异步消息传递 - 对于某些开发人员来说,这是一个硬性要求。
这种方法有一些缺点。如果应用程序在其他某些活动上阻塞,并且从未处理代理发来的数据,则 TCP 套接字将备份,并且消息将积压在代理端。此外,该库的单线程性质意味着应用程序必须自己完成所有 I/O 工作,并且实现一个单独的线程才能有效地使用阻塞接收。
我们可以在第一层之上构建第二层(我们称之为 BrokerSession),它提供这些功能 - 仍然集成到应用程序现有的事件结构中,但以后台线程为代价。后台线程处理来自 ActiveMQ Classic 的数据并将其传递给核心库,将任何生成的的消息放入内部消息缓冲区(或者可能是多个 - 参见下面的详细信息)。有了这个,该库可以为应用程序提供一个更简单、更直接的消息传递接口,因为它以应用程序使用核心库的方式来处理代理通信本身。
第二层只是调用第一层的内部实例来执行任何与 ActiveMQ Classic 相关的消息传递任务 - 没有特定于消息传递系统的代码在核心低级库之外。因此,第二层很薄 - 只是线程、与传输抽象层的交互以及对核心库的功能直通。
有一个第三层 - 需要更高层的应用程序框架库来进行消息传递,这些库可能从用户那里获取所有控制流,并提供尽可能简单的环境。较低级别旨在允许第三级别的设计和需求具有最大的灵活性,而第三级别通常是企业特定的,超出了本文档的范围。该级别的一个库示例是提供一个框架,使编写完全基于回调的应用程序变得容易。由于这将需要从一开始就为应用程序设计它,因此在这个级别上,选择智能指针实现等设计决策将是合适的。对于特定的企业,大多数应用程序可能会使用类似的东西,这将作为 BrokerSession 库的一个薄包装来实现。
传输层抽象概述
与 ActiveMQ Classic 本身类似,我们提供了一个传输层抽象,它管理文件描述符上的连接和 I/O。最初只提供 TCP 套接字支持,但其他实现也有价值 - 进程内管道、从文件回放、UDP 等。这层提供逻辑连接、发送、接收和关闭操作。传输以与其他 ActiveMQ Classic 传输相同的方式使用 URI 初始化。
核心库概述
该库的核心层提供了一个非常简单的接口 - 数据输入,消息输出。
当应用程序的事件循环从传输层的 connect() 调用的返回文件描述符中读取活动时,应用程序读取可用数据并将该数据传递给该库。该库本身有一个内部缓冲区,当完全累积一条消息时,该缓冲区将传递该消息。这种分离的目的是,核心库不会进行任何阻塞 I/O 调用 - 或者进行任何系统调用。这满足了可移植性目标,并确保单线程应用程序可以使用此库而不会潜在失去对线程的控制。
使用核心库发送的消息的端到端生命周期是
- 用户调用“发送”或“订阅”等。
- 核心库构造相应的 OpenWire 命令对象
- 核心库将此对象编组到一个缓冲区中,并将该缓冲区返回给用户
使用核心库接收消息的端到端生命周期是
- 用户从 ActiveMQ Classic 获取数据,可能使用包含的传输库
-
该数据被传递给核心库,该库执行以下两项操作之一
- 如果它还没有接收消息,它会查看前几个字节,并通过检查 OpenWire 标头来记录传入消息的大小。
- 如果是,它将传入缓冲区添加到内部消息缓冲区,并且如果此添加完成了传入消息,它将将其反序列化为一个 OpenWire 命令对象,并采取适当的操作。
这里值得注意的是消息接收的行为 - 核心库能够分段接收消息,并且仅在完全接收消息后才传递消息。
发布和订阅以传统方式工作,只是它们返回要由传输层发送的数据,而不是直接发送数据。这允许更高层确切地确定数据如何发送 - 例如,它允许使用非阻塞 I/O。
应用程序提供两种消息接收选项。库通过一个或多个内部“消息消费者”对象(类似于 JMS MessageConsumer 对象)来实现这一点。有两种类型 - 一种用于同步接收,另一种用于异步接收。同步消费者(BlockingMessageConsumer)提供一个阻塞接收,多个线程可以等待消息。异步消息消费者使用基于事件的接口,通过写入管道中的一个字节来通知应用程序消息已准备好。NonBlockingMessageConsumer 类可以分配其事件文件描述符以及接收的消息 - 异步 receive() 调用是非阻塞的,如果消息未准备好,则返回 NULL。
BrokerSession 库概述
由于拥有一个内部线程来处理来自 ActiveMQ Classic 代理的数据,因此更高级别的库的接口为开发人员提供了一个比第一层更简单、更直接的 API。
由于 BrokerSession 库包含核心库的副本,因此 ActiveMQ Classic 特定的代码被封装在其中。BrokerSession 库中的代码负责为应用程序管理核心库和代理通信。使用后台线程来执行此操作并将传入的消息传递给消息消费者极大地简化了应用程序开发。
与核心库一样,消息分发由应用程序明确完成,而不是由后台线程完成。这两个库都不存储消息回调 - 不需要使用它们。这确保只有调用应用程序已知的线程执行应用程序代码 - 结果是不会强制或要求线程安全编程(满足不强制特定线程模型的目标)。通常会在其前面添加企业特定的代码来进行更自定义的消息传递(例如,可能使用回调)并不奇怪。
该库允许使用多个线程等待消息队列的 JMS 类编程模型,以及 C++ 程序中更常见的基于事件的选择循环模型。这使得它能够适应新的和现有的 C++ 应用程序的不同需求,这些应用程序受除消息库以外的其他因素的设计约束。
日志记录和错误处理
为了符合不强制特定依赖关系并允许轻松集成企业的目标,这些库具有基于回调的日志记录模型。应用程序向库注册一个日志记录器,以便在发生日志事件时调用 - 该日志记录器具有用于错误日志记录、调试日志记录等的函数回调。这允许最轻松地与现有的日志记录系统集成,因为这些回调随后可以直接调用到本机日志记录环境(例如 syslog)或记录到文件,或者任何在特定应用程序中合适的其他东西。
错误处理略有不同。在 C++ 中进行错误处理的本机方法是通过异常。异常非常适合核心库这样的东西,因此当发生错误时,它将向调用者抛出一个 ActiveMQ::Exception 实例。但是,在 BrokerSession 库中,这样的异常可能会被后台线程捕获。在这种情况下,该线程将调用应用程序回调来传递异常。这确保所有错误都到达应用程序,并且可以适当地处理。
异常处理的默认回调只是将相关消息打印到标准错误。