Kafka High Level Consumer Извлекает все сообщения из темы с помощью Java API (эквивалентно --from-begin)
Я тестирую Kafka High Level Consumer, используя код ConsumerGroupExample с сайта Kafka. Я хотел бы получить все существующие сообщения по теме «test», которые есть в конфигурации сервера Kafka. Что касается других блогов, то для auto.offset.reset должно быть установлено «наименьшее», чтобы можно было получать все сообщения:
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}
Вопрос, который у меня действительно возникает, заключается в следующем: каков эквивалент вызова API Java для потребителя высокого уровня, который эквивалентен:
bin / kafka-console-consumer.sh --zookeeper localhost: 2181 --топический тест --от начала