Spark: как узнать количество написанных строк?

Мне интересно, есть ли способ узнать количество строк, записанных с помощью операции сохранения Spark. Я знаю, что достаточно сделать подсчет на СДР, прежде чем писать его, но я хотел бы знать, есть ли способ получить ту же информацию, не делая этого.

Спасибо марко

 samthebest29 мая 2016 г., 13:38
@amit_kumar Я не думаю, что это дубликат, я думаю, что он хочет посчитать его и сохранить без необходимости дважды передавать данные.
 Amit Kumar28 мая 2016 г., 21:19
Это может быть дубликатомstackoverflow.com/questions/28413423/...

Ответы на вопрос(3)

Решение Вопроса

вы можете добавить пользовательский слушатель и извлечь количество записанных строк изoutputMetrics, Очень простой пример может выглядеть так:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10

но эта часть API предназначена для внутреннего использования.

 Joha07 янв. 2019 г., 13:22
Есть ли способ сделать то же самое при написании записей с использованием:spark.write.avro(...)
 mgaido29 мая 2016 г., 19:47
спасибо, но с помощью Spark 1.5.2 это не работает. Вместо этого вы должны сделать:recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.get.recordsWritten
 Konstantin Kulagin16 янв. 2018 г., 15:39
Не лучше ли использовать атомарный recordsWrittenCount вместо блока синхронизации?
 zero32329 мая 2016 г., 20:05
Как уже упоминалось, это внутренний API, поэтому нет гарантии, что он будет стабильным.

ак указано в различных комментариях), тем не менее этот ответ подойдет большинству.

Наиболее эффективным подходом является использование Аккумулятора:http://spark.apache.org/docs/latest/programming-guide.html#accumulators

val accum = sc.accumulator(0L)

data.map { x =>
  accum += 1
  x
}
.saveAsTextFile(path)

val count = accum.value

Затем вы можете обернуть это в полезного сутенера:

implicit class PimpedStringRDD(rdd: RDD[String]) {
  def saveAsTextFileAndCount(p: String): Long = {
    val accum = rdd.sparkContext.accumulator(0L)

    rdd.map { x =>
      accum += 1
      x
    }
    .saveAsTextFile(p)

    accum.value
  }
}

Так что вы можете сделать

val count = data.saveAsTextFileAndCount(path)
 mgaido29 мая 2016 г., 13:59
Что ж, спасибо за ваш ответ ... хотя я продолжаю удивляться, как они могут показывать эту информацию в веб-интерфейсе, если нет внутреннего счетчика ...
 mgaido29 мая 2016 г., 13:44
Я знаю такой подход, но я бы хотел избежать его по двум основным причинам: использование его в преобразовании означает, что результат не может быть доверенным в случае некоторых сбоев; в любом случае есть (немного) накладные расходы. Мне было просто интересно, есть ли счетчик, доступный каким-то образом, как в mapreduce, так как в веб-интерфейсе отображается количество записанных строк ...
 samthebest29 мая 2016 г., 14:01
@ mark91 А, ну, вы могли бы клонировать код пользовательского интерфейса и копаться в нем, я думаю. Прочитав документацию, код, который я дал, в порядке. (Spark говорит, что защищает от перезапущенных задач). Похоже, что вы хотите защититься, когда RDD преобразуется несколько раз, но код, который я дал rdd, недоступен вне области Pimps. Он будет накапливаться только перед записью и накапливаться только один раз.
 Konstantin Kulagin13 мая 2017 г., 16:32
Насколько я понимаю, изменение аккумулятора в действии преобразования (например, в вашем случае map) может привести к недопустимому значению.
 zero32329 мая 2016 г., 14:30
@amit_kumar Если СДР не кэшируется, это должно быть более эффективным, чем отдельный подсчет, поскольку данные будут реализованы только один раз.
 Amit Kumar29 мая 2016 г., 14:18
count = rdd.count (); rdd.saveAsTextFile (р); Это в любом случае лучше?

Если вы посмотрите на

taskEnd.taskInfo.accumulables

Вы увидите, что это связано со следующимиAccumulableInfo вListBuffer в последовательном порядке.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)

Вы можете ясно видеть, что количество выходных строк находится на 7-й позиции listBuffer, поэтому правильный способ получить количество записываемых строк:

taskEnd.taskInfo.accumulables(6).value.get

Мы можем получить строки, написанные следующим образом (я только что изменил ответ @ zero323)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount

Ваш ответ на вопрос