Результаты поиска по запросу "kafka-consumer-api"

2 ответа

Коммиты commitSync и commitAsync используют функцию управления смещением kafka, и оба имеют недостатки. Если обработка сообщения завершается успешно и смещение фиксации завершается неудачно (не атомарно), и в то же время происходит перебалансировка раздела, ваше обработанное сообщение снова обрабатывается (дублирующая обработка) другим пользователем. Если у вас все в порядке с обработкой дубликатов сообщений, вы можете перейти к commitAsync (потому что он не блокирует и не обеспечивает низкую задержку, а обеспечивает фиксацию более высокого порядка. Так что у вас все должно быть в порядке). В противном случае перейдите к пользовательскому управлению смещением, которое заботится об атомарности при обработке и обновлении смещения (используйте внешнее хранилище смещений).

а ...

2 ответа

@gstackoverflow да, в соответствии с исходным кодом клиентов Kafka. Я знаю, это звучит глупо, но в целом и с другой точки зрения, чего бы вы хотели достичь? В критически важных системах лучше выполнять фиксацию смещения вручную, если вы уверены, что сообщение обработано, в других случаях - вы можете предпочесть использование менее частой фиксации смещения для ускорения процесса. Однако смещение коммитов не является тривиальным, так как это потребует zookeeper и т. Д

аюэтот [https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1] : Автоматическая фиксация Самый простой способ зафиксировать смещения - ...

2 ответа

Речь шла о фильтрации внутри сервера (брокера), поэтому, когда у вас есть потоки со многими ГБ и низкой избирательностью, большая часть потока не достигает потребителей (приложений). Но KSQL и KStreams являются клиентскими библиотеками == полный поток достигает всех клиентов, и они выполняют фильтрацию.

трю в потоки Кафки. Я хочу отфильтровать свой поток, используя фильтр с очень низкой селективностью (один на несколько тысяч). Я смотрел на этот ...

ТОП публикаций

2 ответа

Это прекрасно работает большую часть времени. Тем не менее, в семантике точно один раз; это не будет работать.

ичок в Kafka и работаю над прототипом для подключения проприетарного потокового сервиса к Kafka. Я хочу получить ключ от последнего сообщения, отправленного по теме, так как наш внутренний поточный потребитель должен войти в систему с ...

1 ответ

Мне пришлось добавить эту строку непосредственно перед вызовом опроса, кажется, что теперь все работает: kafkaConsumer.seekToBeginning (kafkaConsumer.assignment ())

денный ниже клиент Scala kafka не возвращает никаких событий изpoll вызов. Тем не менее, тема правильная, и я вижу события, отправляемые в тему с помощью консоли: /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh ...

0 ответов

@matthias j sax

дняя версия kafka поддерживает семантику «точно один раз» (EoS). Чтобы поддержать это понятие, дополнительные детали добавляются к каждому сообщению. Это означает, что у вашего потребителя; если вы печатаете смещения сообщений, они не обязательно ...

2 ответа

Вы должны использовать ключи для идентификации каждого сообщения, которое вы хотите отправить, т.е. если вы хотите прочитать все сообщения из 100-го сообщения. Считайте с 100-го смещения до message.key <100 сбросить.

лизовал простой процессор записи мертвых букв Kafka. Он отлично работает при использовании записей, созданных производителем консоли. Однако я считаю, что наши приложения Kafka Streams не гарантируют, что при создании записей по темам приемника ...