HeartbeatServer

уйста,
Не могли бы вы помочь с реализацией простого, эхо-стиля, службы сокетов Heartbeat TCP в Spring Integration DSL? Точнее, как подключить адаптер / обработчик / шлюз кIntegrationFlows на стороне клиента и сервера. Трудно найти практические примеры для взаимодействия клиент-сервер Spring Integration DSL и TCP / IP.

Я думаю, я прибил большую часть кода, это просто о том, чтобы объединить все вместе вIntegrationFlow.

Есть образецэхо-сервис в примерах SI, но написано в «старой» конфигурации XML, и я действительно изо всех сил пытаюсь преобразовать ее в конфигурацию с помощью кода.

Служба «Мой пульс» - это простой сервер, ожидающий, пока клиент запросит «статус», и ответит «ОК».

нет@ServiceActivatorнет@MessageGatewaysбез проксирования, все явно и многословно; управляемый простым запланированным исполнителем JDK на стороне клиента; сервер и клиент в отдельных конфигах и проектах.

HeartbeatClientConfig

@Configuration
@EnableIntegration
public class HeartbeatClientConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetClientConnectionFactory connectionFactory() {
        TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetClientConnectionFactory connectionFactory,
            MessageChannel inboundChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
        heartbeatReceivingMessageAdapter.setClientMode(true);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatClientFlow(
            TcpNetClientConnectionFactory connectionFactory,
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(outboudChannel) // ??????
                .// adapter ???????????
                .// gateway ???????????
                .// handler ???????????
                .get();
    }

    @Bean
    public HeartbeatClient heartbeatClient(
            MessageChannel outboudChannel,
            PollableChannel inboundChannel) {
        return new HeartbeatClient(outboudChannel, inboundChannel);
    }
}

HeartbeatClient

public class HeartbeatClient {
    private final MessageChannel outboudChannel;
    private final PollableChannel inboundChannel;
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat client...");
        start();
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            while (true) {
                try {
                    log.info("Sending Heartbeat");
                    outboudChannel.send(new GenericMessage<String>("status"));
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("OK")) {
                            log.info("Heartbeat OK response received");
                        } else {
                            log.error("Unexpected message content from server: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }, 0, 10000, TimeUnit.SECONDS);
    }
}

HeartbeatServerConfig

@Configuration
@EnableIntegration
public class HeartbeatServerConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory connectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetServerConnectionFactory connectionFactory,
            MessageChannel outboudChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatServerFlow(
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(heartbeatReceivingMessageAdapter) // ???????????????
                .handle(heartbeatSendingMessageHandler) // ???????????????
                .get();
    }

    @Bean
    public HeartbeatServer heartbeatServer(
            PollableChannel inboundChannel, 
            MessageChannel outboudChannel) {
        return new HeartbeatServer(inboundChannel, outboudChannel);
    }
}

HeartbeatServer

public class HeartbeatServer {
    private final PollableChannel inboundChannel;
    private final MessageChannel outboudChannel;
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat");
        start();
    }

    public void start() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    Message<?> ,message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("status")) {
                            log.info("Heartbeat received");
                            outboudChannel.send(new GenericMessage<>("OK"));
                        } else {
                            log.error("Unexpected message content from client: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        });
    }
}

Бонусный вопрос

Почему канал может быть установлен на TcpReceivingChannelAdapter (входящий адаптер), но не на TcpSendingMessageHandler (исходящий адаптер)?

ОБНОВИТЬ
Вот полный исходный код проекта, если кому-то интересно, чтобы кто-нибудь его клонировал:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
Я постараюсь поместить все предложенные решения там.

Ответы на вопрос(2)

Ваш ответ на вопрос