в моем случае может оказаться достаточной идея разделения фрейма данных на маленькие / большие записи.

я есть датафрейм следующей формы:

import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")

|-- id: integer (nullable = false)
|-- data: array (nullable = true)
|    |-- element: double (containsNull = false)


df.withColumn("data_size",size($"data")).show

+---+--------------------+---------+
| id|                data|data_size|
+---+--------------------+---------+
|  1|[0.77845301260182...|      217|
|  2|[0.28806915178410...|      202|
|  3|[0.76304121847720...|      165|
|  4|[0.57955190088558...|        9|
|  5|[0.82134215959459...|       11|
|  6|[0.42193739241567...|       57|
|  7|[0.76381645621403...|        4|
|  8|[0.56507523859466...|       93|
|  9|[0.83541853717244...|      107|
| 10|[0.77955626749231...|      111|
| 11|[0.83721643562080...|      223|
| 12|[0.30546029947285...|      116|
| 13|[0.02705462199952...|       46|
| 14|[0.46646815407673...|       41|
| 15|[0.66312488908446...|       16|
| 16|[0.72644646115640...|      166|
| 17|[0.32210572380128...|      197|
| 18|[0.66680355567329...|       61|
| 19|[0.87055594653295...|       55|
| 20|[0.96600507545438...|       89|
+---+--------------------+---------+

Теперь я хочу применить дорогой UDF, время вычислений пропорционально размеру массива данных. Я размышляю над тем, как я могу перераспределить свои данные таким образом, чтобы в каждом разделе было примерно одинаковое количество «records * data_size» (то есть точки данных, а не просто записи).

Если просто сделатьdf.repartition(100)Я могу получить некоторые разделы, содержащие несколько очень больших массивов, которые являются узким местом всей стадии искры (все остальные такты уже закончены). Если конечно, я мог бы просто выбрать безумное количество разделов, которое (почти) гарантирует, что каждая запись находится в отдельном разделе. Но есть ли другой способ?

Ответы на вопрос(1)

Решение Вопроса

вы можете увеличить количество разделов. Я обычно использую кратное число ядер: параллелизм по умолчанию в контексте контекста * 2-3.
В вашем случае вы можете использовать больший множитель.

Другое решение было бы фильтровать разделить вашу df следующим образом:

DF только с большими массивамиДФ с остальными

Затем вы можете перераспределить каждый из них, выполнить вычисления и объединить их обратно.

Помните, что перераспределение может быть дорогим, поскольку у вас есть большие ряды, чтобы перетасовать.

Вы можете взглянуть на слайды тезисов (27+):https://www.slideshare.net/SparkSummit/custom-applications-with-sparks-rdd-spark-summit-east-talk-by-tejas-patil

Они испытывали очень плохое искажение данных и должны были справиться с этим интересным способом.

 Raphael Roth18 сент. 2017 г., 09:24
в моем случае может оказаться достаточной идея разделения фрейма данных на маленькие / большие записи.

Ваш ответ на вопрос