Spark Streaming mapWithState, кажется, периодически перестраивает завершенное состояние

Я работаю над потоковым проектом Scala (2.11) / Spark (1.6.1) и используюmapWithState() отслеживать просмотренные данные из предыдущих партий.

Состояние распределяется в 20 разделах на нескольких узлах, созданных сStateSpec.function(trackStateFunc _).numPartitions(20), В этом состоянии у нас есть только несколько ключей (~ 100), сопоставленных сSets с ~ 160.000 записей, которые растут по всему приложению. Весь штат до3GB, который может обрабатываться каждым узлом в кластере. В каждом пакете некоторые данные добавляются в состояние, но не удаляются до самого конца процесса, т.е. ~ 15 минут.

Следуя пользовательскому интерфейсу приложения, время обработки каждой 10-й партии очень велико по сравнению с другими партиями. Смотрите изображения:

Желтые поля представляют большое время обработки.

Более подробное представление задания показывает, что в этих пакетах происходят в определенный момент, именно тогда, когда все 20 разделов «пропущены». Или это то, что говорит пользовательский интерфейс.

Мое пониманиеskipped является то, что каждый раздел состояния является одной из возможных задач, которая не выполняется, так как не требует пересчета. Тем не менее, я не понимаю, почему количествоskips меняется в зависимости от каждой работы и почему последняя работа требует так много обработки. Более высокое время обработки происходит независимо от размера состояния, оно просто влияет на продолжительность.

Это ошибка вmapWithState() функциональность или это предполагаемое поведение? Требует ли базовая структура данных какой-либо перестановки,Set в состоянии нужно копировать данные? Или это скорее будет недостаток в моем приложении?

Ответы на вопрос(2)

Ваш ответ на вопрос