Netty синхронный клиент с асинхронными звонками

Я создаю сервер, который принимает команды из многочисленных источников, таких как JMS, SNMP, HTTP и т. Д. Все они асинхронные и работают нормально. Сервер поддерживает одно соединение с одним элементом устаревшего оборудования, которое имеет архитектуру запроса / ответа с настраиваемым протоколом TCP. В идеале я хотел бы одну команду, как этот метод блокировки типа

public Response issueCommandToLegacyHardware(Command command)

или этот метод асинхронного типа

public Future<Response> issueCommandToLegacyHardware(Command command)

Я относительно новичок в Netty и асинхронном программировании, в основном изучаю его по мере продвижения. Моя текущая мысль заключается в том, что мойLegacyHardwareClient класс будет иметьpublic synchronized issueCommandToLegacyHardware(Command command), сделает запись на клиентский канал на устаревшее оборудование, затемtake() изSynchronousQueue<Response> который будет блокировать. ChannelInboundHandler в конвейере будетoffer() a Response кSynchronousQueue>Response> что позволитtake() разблокировать и получить данные.

Это слишком запутанно? Есть ли примеры синхронных реализаций клиента Netty, на которые я могу посмотреть? Есть ли лучшие практики для Нетти? Очевидно, я мог бы использовать только стандартные сокеты Java, однако возможности Netty для синтаксического анализа пользовательских протоколов и простота обслуживания слишком велики, чтобы от них отказываться.

ОБНОВЛЕНИЕ: только что касается реализации, я использовал ArrayBlockingQueue <> () и я использовал put () и remove (), а не offer () и remove (). Потому что я хотел убедиться, что последующие запросы к устаревшему оборудованию отправлялись только тогда, когда на любые активные запросы был получен ответ, так как поведение устаревшего оборудования точно не известно иначе.

Причина, по которой мне не работали offer () и remove (), заключалась в том, что команда offer () ничего не пропустила бы, если бы не было активно блокирующего запроса take () ни с другой стороны. Верно обратное, что remove () ничего не вернет, если не будет блокирующего вызова put () для вставки данных. Я не мог использовать put () / remove (), так как оператор remove () никогда не будет достигнут, так как в канал не было записано никакого запроса на запуск события, из которого будет вызываться remove (). Я не мог использовать offer () / take (), поскольку предложение offer () вернуло бы false, поскольку вызов take () еще не был выполнен. Используя ArrayBlockingQueue <> () с емкостью 1, он гарантировал, что одновременно может быть выполнена только одна команда. Любые другие команды будут блокироваться, пока не будет достаточно места для вставки, с емкостью 1 это означает, что она должна быть пустой. Очистка очереди была произведена после получения ответа от устаревшего оборудования. Это обеспечивало хорошее синхронное поведение по отношению к устаревшему оборудованию, но обеспечивало асинхронный API для пользователей устаревшего оборудования, для которого их много.

Ответы на вопрос(1)

Ваш ответ на вопрос