Результаты поиска по запросу "apache-kafka-streams"

2 ответа

Вы должны использовать ключи для идентификации каждого сообщения, которое вы хотите отправить, т.е. если вы хотите прочитать все сообщения из 100-го сообщения. Считайте с 100-го смещения до message.key <100 сбросить.

лизовал простой процессор записи мертвых букв Kafka. Он отлично работает при использовании записей, созданных производителем консоли. Однако я считаю, что наши приложения Kafka Streams не гарантируют, что при создании записей по темам приемника ...

1 ответ

 будет нарушен. Я понимаю, что это может быть неуклюжим в сценарии тестирования, однако в реальном развертывании это потоковая обработка и, следовательно, никаких проблем.

исал этот код в потоковом приложении Kafka: KGroupedStream<String, foo> groupedStream = stream.groupByKey(); groupedStream.windowedBy( SessionWindows.with(Duration.ofSeconds(3)).grace(Duration.ofSeconds(3))) .aggregate(() -> {...}) ...

2 ответа

Я думаю, что вы можете использовать что-то вроде этого:

ли способ сделать операцию, подобную ветви, но поместить запись в каждый выходной поток, предикат которого оценивается как true? Brach помещает запись в первое совпадение (документация: запись помещается в один и только один выходной поток при ...

ТОП публикаций

1 ответ

Спасибо! Прошу об обновлениях в GlobalKTable от agg_data_in? Согласно моим знаниям, обновления в GlobalKTable перезаписываются (если новые данные приходят и находят там свой ключ, они перезаписывают старые данные / значение).

спользовании KTable потоки Kafka не позволяют экземплярам читать из нескольких разделов определенной темы, когда количество экземпляров / потребителей равно числу разделов. Я пытался добиться этого с помощью GlobalKTable, проблема в том, что ...

1 ответ

 хранить данные (используя сжатие журнала, чтобы сохранить его навсегда).

ользую потоки Kafka для обработки данных в реальном времени, в задачах потоков Kafka мне нужно получить доступ к MySQL для запроса данных, и мне нужно вызвать другую службу restful. Все операции являются синхронными. Я боюсь, что синхронный ...

0 ответов

Извините - не уверен, куда это поставить - я не парень из потоков; Я просто взломал ваши материалы, чтобы исключить встроенного брокера (так как это в вашем названии вопроса). Рад попробовать для вас, если вы можете быть более откровенным с тем, что вы хотите, чтобы я сделал. Но это будет завтра; поздно.

хожу в поисках знания тайного. Во-первых, у меня есть две пары тем, по одной теме в каждой паре, которая входит в другую тему. Два KTables формируются последними темами, которые используются в KTable + KTable leftJoin. Проблема в том, что ...

1 ответ

Не стесняйтесь принять мой ответ, если он работает сейчас :)

даю данные обработки KTable из KStream. Но когда я запускаю сообщения-захоронения с ключом и нулевой полезной нагрузкой, это не удаляет сообщение из KTable. образец - public KStream<String, GenericRecord> ...

0 ответов

Если вы не осколите состояние, основываясь на том же ключе, что и входные темы, вы потеряете локальность / совместное разбиение данных, и, таким образом, ваше приложение будет вычислять неверные результаты.

арственные магазины в Kafka Streams создаются внутри страны. Государственные хранилища разделены по ключам, но не позволяют обеспечить разделение, кромепо ключу (насколько мне известно). ВОПРОСОВ Какконтролировать количество разделов в ...

1 ответ

kafka.apache.org/documentation/streams/...

тоящее время мы реализуем процесс (используя API-интерфейс процессора Kafka), в котором нам нужно объединить информацию из двух взаимосвязанных событий (сообщений) по теме, а затем переслать эту объединенную информацию. События происходят от ...

1 ответ

Загрузка данных в тему Kafka и выполнение соединения KStream-KTable рекомендуется - однако выполнение внешнего вызова не обязательно является анти-паттерном. Помимо оценки производительности, хорошим свойством загрузки данных в таблицу KTable является отделение вашего приложения от внешней системы. Если вы делаете вызов REST, а внешняя система не работает, как вы справитесь с этим? С данными, загруженными в KTable, этот вопрос разрешается естественным образом. Кроме того, если вы хотите включить обработку ровно один раз, вызов REST является побочным эффектом и не покрывается ровно один раз, пока выполняется соединение.

ый подход для реализации варианта использования обогащения входящего потока событий, хранящихся в Kafka, со справочными данными - это вызовmap() оператор внешнего сервиса REST API, который предоставляет эти справочные данные для каждого входящего ...