@dnaumenko Возможно не смещение, а общие метаданные источника :)

2.2 представил структурированный потоковый источник Kafka. Как я понимаю, он использует каталог контрольных точек HDFS для хранения смещений и гарантии доставки сообщений «точно один раз».

Но старые доки (какhttps://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) говорит, что контрольные точки Spark Streaming не восстанавливаются между приложениями или обновлениями Spark и, следовательно, не очень надежны. В качестве решения существует практика поддержки хранения смещений во внешнем хранилище, которое поддерживает транзакции, такие как MySQL или RedshiftDB.

Если я хочу сохранить смещения из источника Kafka в транзакционную БД, как я могу получить смещение из пакета структурированного потока?

Ранее это можно было сделать, приведя RDD кHasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

Но с новым потоковым API у меня естьDataset изInternalRow и я не могу найти простой способ получить смещения. Sink API имеет толькоaddBatch(batchId: Long, data: DataFrame) метод и как я могу предположить, чтобы получить смещение для данного идентификатора партии?

Ответы на вопрос(3)

Ваш ответ на вопрос