@dnaumenko Возможно не смещение, а общие метаданные источника :)
2.2 представил структурированный потоковый источник Kafka. Как я понимаю, он использует каталог контрольных точек HDFS для хранения смещений и гарантии доставки сообщений «точно один раз».
Но старые доки (какhttps://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) говорит, что контрольные точки Spark Streaming не восстанавливаются между приложениями или обновлениями Spark и, следовательно, не очень надежны. В качестве решения существует практика поддержки хранения смещений во внешнем хранилище, которое поддерживает транзакции, такие как MySQL или RedshiftDB.
Если я хочу сохранить смещения из источника Kafka в транзакционную БД, как я могу получить смещение из пакета структурированного потока?
Ранее это можно было сделать, приведя RDD кHasOffsetRanges
:
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Но с новым потоковым API у меня естьDataset
изInternalRow
и я не могу найти простой способ получить смещения. Sink API имеет толькоaddBatch(batchId: Long, data: DataFrame)
метод и как я могу предположить, чтобы получить смещение для данного идентификатора партии?