kafka.apache.org/documentation/streams/...

тоящее время мы реализуем процесс (используя API-интерфейс процессора Kafka), в котором нам нужно объединить информацию из двух взаимосвязанных событий (сообщений) по теме, а затем переслать эту объединенную информацию. События происходят от устройств IoT, и, поскольку мы хотим сохранить их в порядке, в исходном разделе в качестве ключа используется идентификатор устройства. События также содержат идентификатор корреляции:

ключ

{ deviceId: "..." }

Сообщение

{ deviceId: "...", correlationId: "...", data: ...}

Наш первый подход состоял в том, чтобы создать процессор с подключенным хранилищем состояний, в котором хранится каждое входящее сообщение, используя идентификатор корреляции в качестве ключа. Это позволяет нам запрашивать в хранилище идентификатор корреляции входящего сообщения, и, если в хранилище уже есть сообщение с таким же идентификатором, мы можем объединить информацию, переслать новое событие и удалить запись из хранилища. Таким образом, для каждого идентификатора корреляции происходит следующее: в какой-то момент первое сообщение с этим идентификатором используется и сохраняется, а в другой момент времени второе сообщение с этим идентификатором приводит к удалению записи в хранилище.

Государственный ключ

{ correlationId: "..." }

Государственное значение

{ event: { deviceId: "...", correlationId: "...", data: ... }}

Но теперь нам интересно, как Kafka Streams обрабатывает разные ключи. Мы используем подход Microservice, и будет запущено несколько экземпляров этой службы. Магазин автоматически поддерживается внутренней темой. Рассмотрите возможность повторного масштабирования экземпляров службы, s.t. разделы исходной темы и темы состояния перебалансированы. Возможно ли, чтобы раздел для определенного идентификатора корреляции был назначен другой службе, чем раздел для соответствующего идентификатора устройства? Могли бы мы в конечном итоге оказаться в ситуации, если бы второе событие с тем же идентификатором корреляции использовалось бы экземпляром службы, который не имеет доступа к уже сохраненному первому событию?

Заранее спасибо!

Ответы на вопрос(1)

Ваш ответ на вопрос