Имеет ли значение, если вы не можете отправить результат преобразования в AMQP?
ичок в весенней интеграции и не понимаю, как отправлять сообщения об ошибках в назначенную очередь ошибок. Я хочу, чтобы сообщение об ошибке было заголовком исходного сообщения и попадало в отдельную очередь. Я прочитал, что это можно сделать с помощью заголовка, который я пытался реализовать, но в очереди ошибок ничего не появляется.
Кроме того, нужен ли мне отдельный класс обработки исключений, чтобы сообщения об ошибках попадали в очередь ошибок, или я могу просто генерировать исключения в моих методах преобразования?
Вот мой конфиг xml:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" />
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue name="first" auto-delete="false" durable="true" />
<rabbit:queue name="second" auto-delete="false" durable="true" />
<rabbit:queue name="errorQueue" auto-delete="false" durable="true" />
<int:poller default="true" fixed-rate="100"/>
<rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="second" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true">
<rabbit:bindings>
<rabbit:binding queue="errorQueue" />
</rabbit:bindings>
</rabbit:fanout-exchange>
<int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" />
<int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" />
<int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" />
<int:channel id="messageInputChannel" />
<int:channel id="messageOutputChannel"/>
<int:channel id="errorInputChannel"/>
<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" >
<bean class="firstAttempt.MessageErrorHandler"/>
<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel">
<int:header-enricher>
<int:error-channel ref="errorInputChannel" />
</int:header-enricher>
<int:transformer method = "convert" >
<bean class="firstAttempt.JsonObjectConverter" />
</int:transformer>
<int:service-activator method="transform">
<bean class="firstAttempt.Transformer" />
</int:service-activator>
<int:object-to-string-transformer />
</int:chain>
</beans>
Класс ошибки:
public class ErrorHandler {
public String errorHandle(MessageHandlingException exception) {
return exception.getMessage();
Класс QualityScorer (вызывается трансформатором):
public class QualityScorer {
private Hashtable<String, String> table;
private final static String csvFile = "C:\\Users\\john\\Test.csv";
public QualityScorer() throws Exception {
table = new Hashtable<String, String>();
initializeTable();
}
private void initializeTable() throws Exception {
BufferedReader br = null;
String line = "";
String cvsSplitBy = ",";
try {
br = new BufferedReader(new FileReader(csvFile));
while ((line = br.readLine()) != null) {
String[] data = line.split(cvsSplitBy);
if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1"))
table.putIfAbsent(data[3], data[1]);
}
} catch (FileNotFoundException e) {
throw new Exception("No file found");
} catch (IOException e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public float getScore(JSONObject object) throws Exception {
float score;
if (object == null) {
throw new IllegalArgumentException("object");
}
if (!object.has("source")) {
throw new Exception("Object does not have a source");
}
if (!object.has("employer")) {
throw new Exception("Object does not have an employer");
}
String source = object.getString("Source");
String employer = object.getString("employer");
if (table.containsKey(employer) && !source.equals("packageOne")) {
score = 1;
} else {
score = -1;
}
return score;
}
}
Прямо сейчас загружаемое сообщение не имеет источника, поэтому программа должна выдать исключение MessagingException в MessageErrorHandler.
Код трансформатора:
public class Transformer {
private QualityScorer qualityScorer;
public Transformer() throws Exception {
qualityScorer = new QualityScorer();
}
public JSONObject transform(JSONObject object) throws Exception {
float score = qualityScorer.getScore(object);
object.put("score", score);
return object;
}
}
Все вместе, программа должна получить предварительно загруженное сообщение из очереди, преобразовать его и отправить во вторую очередь, что она делает успешно, если в предварительно загруженном сообщении указан источник. Я пытаюсь обработать ошибки и сделать так, чтобы они отправлялись в очередь ошибок как заголовок сообщения. Эта проблема меня давно расстраивала, поэтому помощь очень ценится!
В настоящее время в стеке трассировки отображается ошибка:
java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421)
at java.lang.Thread.run(Thread.java:748)
Но ничего не идет в очередь ошибок.