Смотрите также мой ответ.

работал асинхронные службы Spring Cloud Stream и пытаюсь разработать пограничный сервис, который использует @MessagingGateway для обеспечения синхронного доступа к асинхронным службам по своей природе.

В настоящее время я получаю следующую трассировку стека:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:355)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
... 47 common frames omitted

Мой @MessagingGateway:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.CREATE_ACCOUNT_REQUEST,replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

Если я получаю сообщение в ответном канале через @StreamListener, оно работает просто отлично:

  @HystrixCommand(commandKey = "acounts-edge:accountCreated", fallbackMethod = "accountCreatedFallback", commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "SEMAPHORE")}, ignoreExceptions = {ClientException.class})
  @StreamListener(AccountChannels.ACCOUNT_CREATED)
  public void accountCreated(Account account, @Header(name = "spanTraceId", required = false) String traceId) {
    try {
      if (log.isInfoEnabled()) {
        log.info(new StringBuilder("Account created: ").append(objectMapper.writeValueAsString(account)).toString());
      }
    } catch (JsonProcessingException e) {
      log.error(e.getMessage(), e);
    }
  }

На стороне производителя я настраиваюrequiredGroups чтобы несколько потребителей могли обработать сообщение, и соответственноgroup конфигурации.

Потребитель:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          requiredGroups: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          group: accounts-edge-account-created

Режиссер:

spring:
  cloud:
    stream:
      bindings:
        create-account-request:
          binder: rabbit1
          contentType: application/json
          destination: create-account-request
          group: accounts-service-create-account-request
        account-created:
          binder: rabbit1
          contentType: application/json
          destination: account-created
          requiredGroups: accounts-edge-account-created

Бит кода на стороне производителя, который обрабатывает запрос и отправляет ответ:

  accountChannels.accountCreated().send(MessageBuilder.withPayload(accountService.createAccount(account)).build());

Я могу отладить и увидеть, что запрос получен и обработан, но когда ответ отправляется на канал ответа, именно тогда происходит ошибка.

Чтобы заставить @MessagingGateway работать, какие конфигурации и / или код мне не хватает? Я знаю, что объединяю Spring Integration и Spring Cloud Gateway, поэтому не уверен, что их совместное использование вызывает проблемы.

Ответы на вопрос(3)

я немного запутался и в том, что вы пытаетесь достичь, но давайте посмотрим, сможем ли мы это выяснить. Смешивание SI и SCSt, безусловно, является естественным, поскольку одно строится на другом, поэтому все должно работать: Вот пример фрагмента кода, который я только что выкопал из старого примера, который выставляет конечную точку REST, но делегирует (через шлюз) выходной канал источника. Посмотрите, поможет ли это:

@EnableBinding(Source.class)
@SpringBootApplication
@RestController
public class FooApplication {
    . . . 

    @Autowired
    private Source channels;

    @Autowired
    private CompletionService completionService;

    @RequestMapping("/complete")
    public String completeRequest(@RequestParam int id) {
        this.completionService.complete("foo");
        return "OK";
    }

    @MessagingGateway
    interface CompletionService {
        @Gateway(requestChannel = Source.OUTPUT)
        void complete(String message);
    }
}
 Artem Bilan13 дек. 2017 г., 20:35
Смотрите также мой ответ.

которое искал. Я взял опубликованный Артемом код и разделил его на две службы: службу шлюза и службу CloudStream. Я также добавил@RestController в целях тестирования. По сути, это имитирует то, что я хотел сделать с длительными очередями. Спасибо Артему за помощь! Я очень ценю ваше время! Я надеюсь, что это помогает другим, кто хочет сделать то же самое.

Код шлюза

package com.example.demo;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;

import org.springframework.context.annotation.Bean;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@EnableBinding({GatewayApplication.GatewayChannels.class})
@SpringBootApplication
public class GatewayApplication {

  interface GatewayChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Input(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Output(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  @MessagingGateway
  public interface StreamGateway {
    @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.TO_UPPERCASE_REPLY)
    String process(String payload);
  }

  private static final String ENRICH = "enrich";

  public static void main(String[] args) {
    SpringApplication.run(GatewayApplication.class, args);
  }

  @Bean
  public IntegrationFlow headerEnricherFlow() {
    return IntegrationFlows.from(ENRICH).enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
        .channel(GatewayChannels.TO_UPPERCASE_REQUEST).get();
  }

  @RestController
  public class UppercaseController {
    @Autowired
    StreamGateway gateway;

    @GetMapping(value = "/string/{string}",
        produces = {MediaType.APPLICATION_JSON_VALUE, MediaType.APPLICATION_XML_VALUE})
    public ResponseEntity<String> getUser(@PathVariable("string") String string) {
      return new ResponseEntity<String>(gateway.process(string), HttpStatus.OK);
    }
  }

}

Конфигурация шлюза (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          producer:
            required-groups: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          group: gateway-to-uppercase-reply
server:
  port: 8080

CloudStream Code

package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding({CloudStreamApplication.CloudStreamChannels.class})
@SpringBootApplication
public class CloudStreamApplication {

  interface CloudStreamChannels {

    String TO_UPPERCASE_REPLY = "to-uppercase-reply";
    String TO_UPPERCASE_REQUEST = "to-uppercase-request";

    @Output(TO_UPPERCASE_REPLY)
    SubscribableChannel toUppercaseReply();

    @Input(TO_UPPERCASE_REQUEST)
    MessageChannel toUppercaseRequest();
  }

  public static void main(String[] args) {
    SpringApplication.run(CloudStreamApplication.class, args);
  }

  @StreamListener(CloudStreamChannels.TO_UPPERCASE_REQUEST)
  @SendTo(CloudStreamChannels.TO_UPPERCASE_REPLY)
  public Message<?> process(Message<String> request) {
    return MessageBuilder.withPayload(request.getPayload().toUpperCase())
        .copyHeaders(request.getHeaders()).build();
  }

}

CloudStream Config (application.yml)

spring:
  cloud:
    stream:
      bindings:
        to-uppercase-request:
          destination: to-uppercase-request
          group: stream-to-uppercase-request
        to-uppercase-reply:
          destination: to-uppercase-reply
          producer:
            required-groups: gateway-to-uppercase-reply
server:
  port: 8081
 Vishnu KR11 мая 2018 г., 12:53
работает нормально. Но как преобразовать полезную нагрузку в объект?
Решение Вопроса

ак просто.

Прежде всего мы должны определить для себя, чтоgateway означаетrequest/reply, следовательноcorrelation, И это доступно в@MessagingGateway черезreplyChannel заголовок в лицеTemporaryReplyChannel экземпляр. Даже если у вас есть явныйreplyChannel = AccountChannels.ACCOUNT_CREATEDКорреляция осуществляется только через указанный заголовок и его значение. Тот факт, что этоTemporaryReplyChannel не сериализуем и не может быть передан по сети потребителю на другой стороне.

К счастью, Spring Integration предоставляет нам некоторые решения. Это частьHeaderEnricher и этоheaderChannelsToString вариант позадиHeaderChannelRegistry:

Начиная с Spring Integration 3.0, доступен новый подэлемент; у него нет атрибутов. Это преобразует существующие заголовки replyChannel и errorChannel (если они являются MessageChannel) в строку и сохраняет канал (ы) в реестре для последующего разрешения, когда пришло время отправить ответ или обработать ошибку. Это полезно в случаях, когда заголовки могут быть потеряны; например, при сериализации сообщения в хранилище сообщений или при передаче сообщения через JMS. Если заголовок еще не существует или не является MessageChannel, никаких изменений не производится.

https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-transformation-chapter.html#header-enricher

Но в этом случае вы должны ввести внутренний канал от шлюза кHeaderEnricher и только последний отправит сообщениеAccountChannels.CREATE_ACCOUNT_REQUEST, Так чтоreplyChannel заголовок будет преобразован в строковое представление и сможет путешествовать по сети. На стороне потребителя, когда вы отправляете ответ, вы должны убедиться, что вы передаетеreplyChannel заголовок так же, как есть. Итак, когда сообщение прибудет наAccountChannels.ACCOUNT_CREATED на стороне производителя, где мы имеем это@MessagingGatewayмеханизм корреляции способен преобразовывать идентификатор канала в соответствующийTemporaryReplyChannel и сопоставить ответ с ожидающим вызовом шлюза.

Единственная проблема в том, что ваше приложение-производитель должно быть единственным потребителем в группе дляAccountChannels.ACCOUNT_CREATED - мы должны убедиться, что одновременно работает только один экземпляр в облаке. Просто потому, что только один экземплярTemporaryReplyChannel в его памяти.

Больше информации о шлюзе:https://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#gateway

ОБНОВИТЬ

Некоторый код для помощи:

@EnableBinding(AccountChannels.class)
@MessagingGateway

public interface AccountService {
  @Gateway(requestChannel = AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST, replyChannel = AccountChannels.ACCOUNT_CREATED, replyTimeout = 60000, requestTimeout = 60000)
  Account createAccount(@Payload Account account, @Header("Authorization") String authorization);
}

@Bean
public IntegrationFlow headerEnricherFlow() {
   return IntegrationFlows.from(AccountChannels.INTERNAL_CREATE_ACCOUNT_REQUEST)
            .enrichHeaders(headerEnricher -> headerEnricher.headerChannelsToString())
            .channel(AccountChannels.CREATE_ACCOUNT_REQUEST)
            .get();

}

ОБНОВИТЬ

Несколько простых приложений для демонстрации PoC:

@EnableBinding({ Processor.class, CloudStreamGatewayApplication.GatewayChannels.class })
@SpringBootApplication
public class CloudStreamGatewayApplication {

    interface GatewayChannels {

        String REQUEST = "request";

        @Output(REQUEST)
        MessageChannel request();


        String REPLY = "reply";

        @Input(REPLY)
        SubscribableChannel reply();
    }

    private static final String ENRICH = "enrich";


    @MessagingGateway
    public interface StreamGateway {

        @Gateway(requestChannel = ENRICH, replyChannel = GatewayChannels.REPLY)
        String process(String payload);

    }

    @Bean
    public IntegrationFlow headerEnricherFlow() {
        return IntegrationFlows.from(ENRICH)
                .enrichHeaders(HeaderEnricherSpec::headerChannelsToString)
                .channel(GatewayChannels.REQUEST)
                .get();
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public Message<?> process(Message<String> request) {
        return MessageBuilder.withPayload(request.getPayload().toUpperCase())
                .copyHeaders(request.getHeaders())
                .build();
    }


    public static void main(String[] args) {
        ConfigurableApplicationContext applicationContext =
                SpringApplication.run(CloudStreamGatewayApplication.class, args);

        StreamGateway gateway = applicationContext.getBean(StreamGateway.class);

        String result = gateway.process("foo");

        System.out.println(result);
    }

}

application.yml:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: requests
        output:
          destination: replies
        request:
          destination: requests
        reply:
          destination: replies

я используюspring-cloud-starter-stream-rabbit.

MessageBuilder.withPayload(request.getPayload().toUpperCase())
            .copyHeaders(request.getHeaders())
            .build()

Выполняет ли трюк копирование заголовков запроса в ответное сообщение. Таким образом, шлюз может на стороне ответа преобразовывать идентификатор канала в заголовках в соответствующийTemporaryReplyChannel правильно передать ответ вызывающему абоненту.

Вопрос СКС по этому вопросу:https://github.com/spring-cloud/spring-cloud-stream/issues/815

 Keith Bennett13 дек. 2017 г., 20:58
Спасибо за быстрый ответ, Артем. Я думаю, что понимаю суть того, что вы заявляете, но я хочу убедиться, что понимаю. Вы заявляете, что мне нужно создать bean-компонент HeaderEnricher на стороне @MessagingGateway? Если это так, как мне настроить его атрибуты inputChannel и outputChannel, учитывая, что текущим requestChannel является AccountChannels.CREATE_ACCOUNT_REQUEST?
 Keith Bennett13 дек. 2017 г., 21:01
Кроме того, мне неясно, как TemporaryReplyChannel влияет на создание bean-компонента HeaderEncricher.
 Keith Bennett13 дек. 2017 г., 21:38
Итак, если мой боб имеет@Transformer(inputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST_HEADERS, outputChannel = AccountChannels.CREATE_ACCOUNT_REQUEST)какой канал указывать в качестве аргументаheaderChannelRegistry.channelToChannelName(channel)? Извините, если я упускаю что-то очевидное здесь.
 Artem Bilan13 дек. 2017 г., 21:03
Шлюз создаетTemporaryReplyChannel и заполняет его в сообщение, которое он отправляетrequestChannel.
 Artem Bilan13 дек. 2017 г., 21:04
ЭтоrequestChannel должен быть в качестве входных данных дляHeaderEncricher, что-то внутреннее, не обязательное назначение.outputChannel изHeaderEncricher будет ужеAccountChannels.CREATE_ACCOUNT_REQUEST

Ваш ответ на вопрос