Интеграция Spring с Rabbit AMQP для «Клиент отправляет сообщение -> Сервер получает и возвращает сообщения в очереди возврата -> Клиент получает коррелированные сообщения»

Я могу написать Java-программу, используя Rabbit Java API, выполняя следующие действия:

Клиент отправляет сообщение по обмену / очереди Rabbit MQ с идентификатором корреляции (например, UUID - «348a07f5-8342-45ed-b40b-d44bfd9c4dde»).

Сервер получает сообщение.

Сервер отправляет ответное сообщение по обмену / очереди Rabbit MQ с тем же идентификатором корреляции - «348a07f5-8342-45ed-b40b-d44bfd9c4dde».

Клиент получил коррелированное сообщение только в той же ветке, что и 1.

Ниже приведены Send.java и Recv.java с использованием API-интерфейсов Rabbit. Мне нужна помощь, чтобы преобразовать этот пример, чтобы использовать интеграцию Spring AMQP, особенно получение части на шаге 4. Я ищу что-то вроде метода приема, который может фильтровать сообщения, используя Id корреляции.

Send.java:
import java.util.UUID;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Send {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        String message = "Hello World!";
        String cslTransactionId = UUID.randomUUID().toString();
        BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(cslTransactionId)
            .replyTo(RESPONSE_QUEUE).build();

        channel.basicPublish("", REQUEST_QUEUE, properties, message.getBytes());

        System.out.println("Client Sent '" + message + "'");


        Channel responseChannel = connection.createChannel();
        responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        responseChannel.basicConsume(RESPONSE_QUEUE, false, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String responseMessage = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            if (correlationId.equals(cslTransactionId)) {
                    System.out.println("Client Received '" + responseMessage + "'");
                responseChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                break;
            }
        }

        channel.close();
        connection.close();
    }
}
Recv.java
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;

public class Recv {

    private final static String REQUEST_QUEUE = "REQUEST.QUEUE";
    private final static String RESPONSE_QUEUE = "RESPONSE.QUEUE";

    public static void main(String[] argv) throws Exception {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(REQUEST_QUEUE, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(REQUEST_QUEUE, true, consumer);
        String correlationId = null;
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            correlationId = delivery.getProperties().getCorrelationId();
            System.out.println("Correlation Id:" + correlationId);
            System.out.println("Server Received '" + message + "'");
            if (correlationId != null)
                break;
            }

            String responseMsg = "Response Message";
            Channel responseChannel = connection.createChannel();
            responseChannel.queueDeclare(RESPONSE_QUEUE, false, false, false, null);
            BasicProperties properties = (new BasicProperties.Builder())
            .correlationId(correlationId).build();

            channel.basicPublish("", RESPONSE_QUEUE, properties,responseMsg.getBytes());

            System.out.println("Server Sent '" + responseMsg + "'");

            channel.close();
            connection.close();
       }
}

После запуска конфигурации Java, предоставленной gary, я пытаюсь переместить конфигурацию в формат XML для добавления на сервер прослушивателя. Ниже приведена конфигурация XML:

server.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean 
        id="serviceListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="queues" ref="requestQueue"/>
            <property name="messageListener" ref="messageListenerAdaptor"/>
    </bean>

    <bean id="messageListenerAdaptor"
        class="org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter">
        <property name="delegate" ref="pojoListener" />
    </bean>

    <bean 
        id="pojoListener"
        class="PojoListener"/>

    <bean
        id="replyListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="queues" ref="replyQueue"/>
        <property name="messageListener" ref="fixedReplyQRabbitTemplate"/>
    </bean>

    <!-- Infrastructure -->
    <rabbit:connection-factory 
        id="connectionFactory" 
        host="localhost" 
        username="guest" 
        password="guest" 
        cache-mode="CHANNEL" 
        channel-cache-size="5"/>

    <rabbit:template 
        id="fixedReplyQRabbitTemplate" 
        connection-factory="connectionFactory"
        exchange="fdms.exchange"
        routing-key="response.key"
        reply-queue="RESPONSE.QUEUE">
        <rabbit:reply-listener/>
    </rabbit:template>

    <rabbit:admin connection-factory="connectionFactory"/>

    <rabbit:queue id="requestQueue" name="REQUEST.QUEUE" />
    <rabbit:queue id="replyQueue" name="RESPONSE.QUEUE" />

    <rabbit:direct-exchange name="fdms.exchange" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="RESPONSE.QUEUE" key="response.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>
SpringReceive.java
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.support.ClassPathXmlApplicationContext;


public class SpringReceive {

/**
 * @param args
 */
public static void main(String[] args) {
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("cslclient.xml");
    SimpleMessageListenerContainer serviceListenerContainer =     context.getBean("serviceListenerContainer", SimpleMessageListenerContainer.class);
    serviceListenerContainer.start();
    }
}

Ответы на вопрос(1)

Ваш ответ на вопрос