Como conectar-se ao processamento de mensagens antes / depois usando @RabbitListener
Problema: Estou migrando da interface MessageListener impl para @RabbitListener. Eu tinha uma lógica como essa em que eu estava processando as mensagens "pré" e "post" em um MessageListener herdado por várias classes
exemplo:
public AbstractMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
//do some pre message processing
process(Message message);
// do some post message processing
}
protected abstract void process(Message message);
}
Pergunta, questão: Existe uma maneira de obter algo semelhante usando a anotação @RabbitListener Onde posso herdar a lógica de processamento de mensagens pré / pós sem ter que reimplementar ou chamar o processamento de mensagens pré / pós dentro de cada anotação @RabbitListener filho e manter um tempo personalizável assinaturas de método para o filho @RabbitListener? Ou isso está sendo muito ganancioso?
Exemplo de resultado desejado:
public class SomeRabbitListenerClass {
@RabbitListener( id = "listener.mypojo",queues = "${rabbitmq.some.queue}")
public void listen(@Valid MyPojo myPojo) {
//...
}
}
public class SomeOtherRabbitListenerClass {
@RabbitListener(id = "listener.orders",queues ="${rabbitmq.some.other.queue}")
public void listen(Order order, @Header("order_type") String orderType) {
//...
}
}
com esses @RabbitListener (s) utilizando o mesmo processamento herdado de mensagens pré / pós
Vejo que há um argumento 'containerFactory' na anotação @RabbitListener, mas já estou declarando um na configuração ... e tenho certeza de como obter a herança que desejo com um containerFactory personalizado.
Resposta atualizada: Foi isso que acabei fazendo.
Definição de conselho:
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.springframework.amqp.core.Message;
/**
* AOP Around advice wrapper. Every time a message comes in we can do
* pre/post processing by using this advice by implementing the before/after methods.
* @author sjacobs
*
*/
public class RabbitListenerAroundAdvice implements MethodInterceptor {
/**
* place the "AroundAdvice" around each new message being processed.
*/
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[1];
before(message)
Object result = invocation.proceed();
after(message);
return result;
}
declarar beans: Em sua configuração rabbitmq, declare o conselho como um bean Spring e passe-o para o rabbitListenerContainerFactory # setAdviceChain (...)
//...
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory( cachingConnectionFactory() );
factory.setTaskExecutor(threadPoolTaskExecutor());
factory.setMessageConverter(jackson,2JsonMessageConverter());
factory.setAdviceChain(rabbitListenerAroundAdvice());
return factory;
}
@Bean
public RabbitListenerAroundAdvice rabbitListenerAroundAdvice() {
return new RabbitListenerAroundAdvice();
}
// ...