我应该如何使用 JMS 实现请求响应

 常见问题解答 > JMS > 我应该如何使用 JMS 实现请求响应

我应该如何使用 JMS 实现请求响应?

最简单的解决方案是使用 Camel 作为 Spring Remoting 提供者,它允许您将所有 JMS API 隐藏在业务逻辑中,并让 Camel 为您提供请求/响应处理代码。

但是,如果您希望自己编写 JMS 客户端代码,请继续阅读它是如何工作的…

使用 JMS API 实现请求响应

您最初可能认为,要在 JMS 中实现请求响应类型的操作,您应该为每个请求创建一个具有选择器的新的消费者;或者可能为每个请求创建一个新的临时队列。

创建临时目标、消费者、生产者和连接都是与代理的同步请求响应操作,因此应避免为每个请求进行处理,因为它会导致与 JMS 代理的很多通信。

使用 JMS 实现请求响应的最佳方法是在启动时为每个客户端创建一个临时队列和消费者,将每个消息的 JMSReplyTo 属性设置为临时队列,然后使用 每个消息上的相关 ID 来将请求消息与响应消息相关联。这避免了为每个请求创建和关闭消费者的开销(这很昂贵)。这也意味着您可以在需要时(或者可能将它们池化)跨多个线程共享相同的生产者和消费者。

Lingo 库 是使用 JMS 的 Spring 远程调用的实现。(Spring 远程调用是一种基于 POJO 的远程调用,其中远程调用代码对您的业务逻辑代码是不可见的)。

它恰恰使用了这种模式;使用相关 ID 将请求与响应相关联。服务器端只需记住将入站消息的相关 ID 放入响应中即可。

执行此操作的实际类是 MultiplexingRequestor。可能仅仅使用 Lingo 的 Spring 远程调用是实现请求响应的最简单方法 - 或者您可能只需要使用 Lingo 的 Requestor 接口来保留 JMS 语义。

更多详细信息 此处

客户端

因此,客户端在临时队列上创建消费者,如下所示…

// client side
Destination tempDest = session.createTemporaryQueue();
MessageConsumer responseConsumer = session.createConsumer(tempDest);
...

// send a request..
message.setJMSReplyTo(tempDest)
message.setJMSCorrelationID(myCorrelationID);

producer.send(message);

服务器端

public void onMessage(Message request) {

  Message response = session.createMessage();
  response.setJMSCorrelationID(request.getJMSCorrelationID())

  producer.send(request.getJMSReplyTo(), response)
}

完整示例

服务器端

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Server implements MessageListener {
    private static int ackMode;
    private static String messageQueueName;
    private static String messageBrokerUrl;

    private Session session;
    private boolean transacted = false;
    private MessageProducer replyProducer;
    private MessageProtocol messageProtocol;

    static {
        messageBrokerUrl = "tcp://127.0.0.1:61616";
        messageQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Server() {
        try {
            //This message broker is embedded
            BrokerService broker = new BrokerService();
            broker.setPersistent(false);
            broker.setUseJmx(false);
            broker.addConnector(messageBrokerUrl);
            broker.start();
        } catch (Exception e) {
            //Handle the exception appropriately
        }

        //Delegating the handling of messages to another class, instantiate it before setting up JMS so it
        //is ready to handle messages
        this.messageProtocol = new MessageProtocol();
        this.setupMessageQueueConsumer();
    }

    private void setupMessageQueueConsumer() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(messageBrokerUrl);
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            this.session = connection.createSession(this.transacted, ackMode);
            Destination adminQueue = this.session.createQueue(messageQueueName);

            //Setup a message producer to respond to messages from clients, we will get the destination
            //to send to from the JMSReplyTo header field from a Message
            this.replyProducer = this.session.createProducer(null);
            this.replyProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Set up a consumer to consume messages off of the admin queue
            MessageConsumer consumer = this.session.createConsumer(adminQueue);
            consumer.setMessageListener(this);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public void onMessage(Message message) {
        try {
            TextMessage response = this.session.createTextMessage();
            if (message instanceof TextMessage) {
                TextMessage txtMsg = (TextMessage) message;
                String messageText = txtMsg.getText();
                response.setText(this.messageProtocol.handleProtocolMessage(messageText));
            }

            //Set the correlation ID from the received message to be the correlation id of the response message
            //this lets the client identify which message this is a response to if it has more than
            //one outstanding message to the server
            response.setJMSCorrelationID(message.getJMSCorrelationID());

            //Send the response to the Destination specified by the JMSReplyTo field of the received message,
            //this is presumably a temporary queue created by the client
            this.replyProducer.send(message.getJMSReplyTo(), response);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String\[\] args) {
        new Server();
    }
}

客户端

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.Random;

public class Client implements MessageListener {
    private static int ackMode;
    private static String clientQueueName;

    private boolean transacted = false;
    private MessageProducer producer;

    static {
        clientQueueName = "client.messages";
        ackMode = Session.AUTO_ACKNOWLEDGE;
    }

    public Client() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        Connection connection;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(transacted, ackMode);
            Destination adminQueue = session.createQueue(clientQueueName);

            //Setup a message producer to send message to the queue the server is consuming from
            this.producer = session.createProducer(adminQueue);
            this.producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            //Create a temporary queue that this client will listen for responses on then create a consumer
            //that consumes message from this temporary queue...for a real application a client should reuse
            //the same temp queue for each message to the server...one temp queue per client
            Destination tempDest = session.createTemporaryQueue();
            MessageConsumer responseConsumer = session.createConsumer(tempDest);

            //This class will handle the messages to the temp queue as well
            responseConsumer.setMessageListener(this);

            //Now create the actual message you want to send
            TextMessage txtMessage = session.createTextMessage();
            txtMessage.setText("MyProtocolMessage");

            //Set the reply to field to the temp queue you created above, this is the queue the server
            //will respond to
            txtMessage.setJMSReplyTo(tempDest);

            //Set a correlation ID so when you get a response you know which sent message the response is for
            //If there is never more than one outstanding message to the server then the
            //same correlation ID can be used for all the messages...if there is more than one outstanding
            //message to the server you would presumably want to associate the correlation ID with this
            //message somehow...a Map works good
            String correlationId = this.createRandomString();
            txtMessage.setJMSCorrelationID(correlationId);
            this.producer.send(txtMessage);
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    private String createRandomString() {
        Random random = new Random(System.currentTimeMillis());
        long randomLong = random.nextLong();
        return Long.toHexString(randomLong);
    }

    public void onMessage(Message message) {
        String messageText = null;
        try {
            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                messageText = textMessage.getText();
                System.out.println("messageText = " + messageText);
            }
        } catch (JMSException e) {
            //Handle the exception appropriately
        }
    }

    public static void main(String\[\] args) {
        new Client();
    }
}

协议类

此类需要运行上面的客户端/服务器示例。将消息处理委托给单独的类仅仅是个人偏好。

public class MessageProtocol {
    public String handleProtocolMessage(String messageText) {
        String responseText;
        if ("MyProtocolMessage".equalsIgnoreCase(messageText)) {
            responseText = "I recognize your protocol message";
        } else {
            responseText = "Unknown protocol message: " + messageText;
        }
        
        return responseText;
    }
}

Apache、ActiveMQ、Apache ActiveMQ、Apache 羽毛徽标和 Apache ActiveMQ 项目徽标是 Apache 软件基金会的商标。版权所有 © 2024,Apache 软件基金会。根据 Apache 许可证 2.0 授权。