Результаты поиска по запросу "apache-kafka"

1 ответ

 это также не разрешается, очевидно, что что-то мешает kafka-consumer-groups.sh прочитать тему __consumer_offsets (при запуске с параметром --bootstrap-server, в противном случае она читает ее из zookeeper) и отобразить результаты, это то, что Кафка Инструмент обходится без проблем, и я считаю, что эти две проблемы связаны. И причина, по которой я считаю, что тема не уплотнена, заключается в том, что в ней есть сообщения с точно таким же ключом (и даже временной меткой), более старые, чем должны, согласно настройкам брокера. Kafka Tool также игнорирует определенные записи и не интерпретирует их как группы потребителей на этом экране. Почему kafka-consumer-groups.sh игнорирует все, что, вероятно, связано с некоторым повреждением этих записей.

я есть два вида записей журнала в server.log Первый вид: ПРЕДУПРЕЖДЕНИЕ Сброс первого грязного смещения__consumer_offsets-6 регистрировать начальное смещение 918, так какcheckpointed смещение 903 недопустимо. ...

2 ответа

Вы должны использовать ключи для идентификации каждого сообщения, которое вы хотите отправить, т.е. если вы хотите прочитать все сообщения из 100-го сообщения. Считайте с 100-го смещения до message.key <100 сбросить.

лизовал простой процессор записи мертвых букв Kafka. Он отлично работает при использовании записей, созданных производителем консоли. Однако я считаю, что наши приложения Kafka Streams не гарантируют, что при создании записей по темам приемника ...

0 ответов

@matthias j sax

дняя версия kafka поддерживает семантику «точно один раз» (EoS). Чтобы поддержать это понятие, дополнительные детали добавляются к каждому сообщению. Это означает, что у вашего потребителя; если вы печатаете смещения сообщений, они не обязательно ...

ТОП публикаций

1 ответ

 будет нарушен. Я понимаю, что это может быть неуклюжим в сценарии тестирования, однако в реальном развертывании это потоковая обработка и, следовательно, никаких проблем.

исал этот код в потоковом приложении Kafka: KGroupedStream<String, foo> groupedStream = stream.groupByKey(); groupedStream.windowedBy( SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3))) .aggregate(() -> {...}) ...

2 ответа

Я думаю, что вы можете использовать что-то вроде этого:

ли способ сделать операцию, подобную ветви, но поместить запись в каждый выходной поток, предикат которого оценивается как true? Brach помещает запись в первое совпадение (документация: запись помещается в один и только один выходной поток при ...

2 ответа

Кроме того, не делайте повторные перезапуски брокера, как это. Вы должны сначала определить, кто является Контроллером, а затем закрыть его последним.

работает пять узлов кластера с 3 зоопарками - все они являются виртуальными машинами. Мы должны часто перезагружать кластер для некоторых аппаратных исправлений. Мы написали ANSI-скрипт для отключения кластера в следующем порядке: Остановите ...

1 ответ

Мне пришлось добавить эту строку непосредственно перед вызовом опроса, кажется, что теперь все работает: kafkaConsumer.seekToBeginning (kafkaConsumer.assignment ())

денный ниже клиент Scala kafka не возвращает никаких событий изpoll вызов. Тем не менее, тема правильная, и я вижу события, отправляемые в тему с помощью консоли: /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh ...

1 ответ

С приведенными ниже изменениями код работает нормально.

виваюсьSpring Boot + Apache Kafka + Apache Zookeeper пример. Я установил / настройкаApache Zookeeper and Apache Kafka на моей локальной машине windows. Я взял ссылку по ...

2 ответа

Согласитесь, вы должны увеличить его вручную, так как тема уже создана. Продолжайте хорошую работу!!

Quickstart [https://kafka.apache.org/quickstart] Использование Kafka v2.1.0 на RHEL v6.9 Потребитель не получает данные, когда один из брокеров Kafka не работает. Шаги выполнены: 1. Запустите зоопарка 2. Запустите Kafka-Server0 (локальный ...

1 ответ

Спасибо! Прошу об обновлениях в GlobalKTable от agg_data_in? Согласно моим знаниям, обновления в GlobalKTable перезаписываются (если новые данные приходят и находят там свой ключ, они перезаписывают старые данные / значение).

спользовании KTable потоки Kafka не позволяют экземплярам читать из нескольких разделов определенной темы, когда количество экземпляров / потребителей равно числу разделов. Я пытался добиться этого с помощью GlobalKTable, проблема в том, что ...