Kafka 0.9.0.1 Java Consumer bleibt in awaitMetadataUpdate () hängen
Ich versuche, einen einfachen Kafka-Konsumenten mit der Java-API v0.9.0.1 zum Laufen zu bringen. Der von mir verwendete Kafka-Server ist ein Docker-Container, auf dem auch Version 0.9.0.1 ausgeführt wird. Unten ist der Verbrauchercode:
public class Consumer {
public static void main(String[] args) throws IOException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
consumer = new KafkaConsumer<>(properties);
}
consumer.subscribe(Arrays.asList("messages"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.println("Message received: " + record.value());
}
}catch(WakeupException ex){
System.out.println("Exception caught " + ex.getMessage());
}finally{
consumer.close();
System.out.println("After closing KafkaConsumer");
}
}
}
Beim Starten des Consumer wird jedoch die obige poll (100) -Methode aufgerufen und nie zurückgegeben. Beim Debuggen scheint die Ausführung der folgenden Methode in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient für immer hängen zu bleiben:
public void awaitMetadataUpdate() {
int version = this.metadata.requestUpdate();
do {
this.poll(9223372036854775807L);
} while(this.metadata.version() == version);
}
(sowohl version als auch this.metadata.version () scheinen immer == 2 zu sein). Obwohl es keine Fehler auslöst, haben die Nachrichten meines Java-Produzenten es nie in die Warteschlange geschafft. Ich habe überprüft, dass ich mithilfe der Befehlszeilen-Kafka-Tools Nachrichten aus der Warteschlange senden und empfangen kann.
Hat jemand eine Ahnung, was hier los ist?