Kafka 0.9.0.1 Java Consumer застрял в awaitMetadataUpdate ()

Я пытаюсь заставить простого Kafka Consumer работать с использованием Java API v0.9.0.1. Сервер kafka, который я использую, является док-контейнером, также работающим с версией 0.9.0.1. Ниже приведен код потребителя:

public class Consumer {
    public static void main(String[] args) throws IOException {

        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            consumer = new KafkaConsumer<>(properties);
        }

        consumer.subscribe(Arrays.asList("messages"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
                    System.out.println("Message received: " + record.value());
            }
        }catch(WakeupException ex){
            System.out.println("Exception caught " + ex.getMessage());
        }finally{
            consumer.close();
            System.out.println("After closing KafkaConsumer");
        }
    }
}

Однако при запуске потребителя он вызывает метод poll (100), описанный выше, и никогда не возвращается. При отладке похоже, что он застрял, запустив следующий метод в org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient навсегда:

public void awaitMetadataUpdate() {
    int version = this.metadata.requestUpdate();

    do {
        this.poll(9223372036854775807L);
    } while(this.metadata.version() == version);

}

(обе версии и this.metadata.version () всегда кажутся == 2). Кроме того, хотя он не выдает ошибок, сообщения от моего производителя java никогда не видели, чтобы попасть в очередь. Я убедился, что используя инструменты командной строки kafka, я могу отправлять и получать сообщения из очереди.

Кто-нибудь знает, что здесь происходит?

 Alex Chirițescu18 авг. 2016 г., 08:44
@cacois, когда вы говорите установить эти переменные на сервере, какой сервер вы имеете в виду? Это машина, выполняющая код Java с потребителем? Или это контейнер докера?
 Alex Chirițescu18 авг. 2016 г., 08:51
Кроме того, вы используете Mac, Linux или Win? :-)
 cacois13 июн. 2016 г., 20:52
Лучано, ты на месте. Установка переменных среды ADVERTISED_PORT и ADVERTISED_HOST на сервере устранила проблему. Это немного сбивает с толку, что без них потребитель / производитель командной строки может работать правильно, но реализации Java не могут.
 Luciano Afranllie13 июн. 2016 г., 19:40
Обычно это проблема, связанная с тем, что ваши брокеры не рекламируют конечную точку, доступную для производителей и потребителей. Ваши брокеры слушают по доступному адресу снаружи? Проверьте брокерскую недвижимость advertized.listeners
 Matthias J. Sax13 июн. 2016 г., 22:22
Кажется, известная проблема:issues.apache.org/jira/browse/KAFKA-3727

Ответы на вопрос(1)

В случае, если это поможет кому-то еще с подобными проблемами, решение для меня было установить следующие переменные среды:

ADVERTISED_HOST=localhost
ADVERTISED_PORT=9092

(конечно, значения здесь могут измениться в соответствии с вашей установкой)

Очевидно, что сценарии потребителя и производителя командной строки могут правильно находить и взаимодействовать с посредником без установки этих переменных env, но реализации Java API не могут. Также не выдается никаких ошибок, просто бесконечный цикл в первом опросе, когда он пытается обновить метаданные.

Ваш ответ на вопрос