Понимание искры физического плана

Я пытаюсь понять физические планы на искру, но я не понимаю некоторые части, потому что они кажутся отличными от традиционных rdbms. Например, в приведенном ниже плане это план запроса к таблице кустов. Запрос таков:

select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        avg(l_quantity) as avg_qty,
        avg(l_extendedprice) as avg_price,
        avg(l_discount) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where
        l_shipdate <= '1998-09-16'
    group by
        l_returnflag,
        l_linestatus
    order by
        l_returnflag,
        l_linestatus;


== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
            +- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
               +- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
                  +- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
                     +- Filter (l_shipdate#37 <= 1998-09-16)
                        +- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None

Для того, что я понимаю в плане:

Сначала начинается с сканирования таблицы Hive

Затем он фильтрует, используя, где условие

Затем проект, чтобы получить столбцы, которые мы хотим

Тогда TungstenAggregate?

Тогда TungstenExchange?

Тогда TungstenAggregate снова?

Тогда ConvertToSafe?

Затем сортирует окончательный результат

Но я не понимаю 4, 5, 6 и 7 шагов. Ты знаешь, кто они? Я ищу информацию об этом, чтобы понять план, но не нахожу ничего конкретного.

Ответы на вопрос(2)

который управляет данными вне JVM, чтобы сэкономить некоторые затраты на сборку мусора. Вы можете представить себе, что это включает в себя копирование данных из JVM и в нее. Вот и все. В Spark 1.5 вы можете отключить вольфрам черезspark.sql.tungsten.enabled тогда вы увидите «старый» план, в Spark 1.6, я думаю, вы больше не сможете его отключить.

Решение Вопроса

SELECT
    ...  -- not aggregated columns  #1
    ...  -- aggregated columns      #2
FROM
    ...                          -- #3
WHERE
    ...                          -- #4
GROUP BY
    ...                          -- #5
ORDER BY
    ...                          -- #6

Как вы уже подозреваете:

Filter (...) соответствует предикатам вWHERE пункт (#4)Project ... ограничивает количество столбцов до тех, которые требуются объединением (#1 а также#2, а также#4 / #6 если нет вSELECT)HiveTableScan соответствуетFROM пункт (#3)

Остальные части можно отнести к следующему:

#2 отSELECT оговорка -functions поле вTungstenAggregates

GROUP BY пункт (#4):

TungstenExchange / хэшированиеkey поле вTungstenAggregates

#6 - ORDER BY пункт.

Project Tungsten в целом описывает набор оптимизаций, используемых SparkDataFrames (-sets) в том числе:

явное управление памятью сsun.misc.Unsafe, Это означает «собственное» (вне кучи) использование памяти и явное выделение / освобождение памяти вне управления GC. Эти преобразования соответствуютConvertToUnsafe / ConvertToSafe шаги в плане выполнения. Вы можете узнать некоторые интересные детали о небезопасных отПонимание sun.misc.Unsafeгенерация кода - различные приемы метапрограммирования, предназначенные для генерации кода, который лучше оптимизируется во время компиляции. Вы можете думать об этом как о внутреннем компиляторе Spark, который делает такие вещи, как переписывание красивого функционального кода в уродливые циклы for.

Вы можете узнать больше о вольфраме в целом изProject Tungsten: доведение Apache Spark ближе к голому металлу. Apache Spark 2.0: быстрее, проще и умнее предоставляет несколько примеров генерации кода.

TungstenAggregate происходит дважды, потому что данные сначала локально агрегируются в каждом разделе, затем перемешиваются и, наконец, объединяются. Если вы знакомы с RDD API, этот процесс примерно эквивалентенreduceByKey.

Если план выполнения неясен, вы можете также попытаться преобразовать полученный результатDataFrame вRDD и проанализировать выводtoDebugString.

 zero32330 мая 2016 г., 08:41
Functions поле перечисляет все агрегации, которые выполняются на данном этапе, в то время какKey поле описывает группировку. этоdf.groupBy(*key).agg(*functions).
 codin30 мая 2016 г., 04:15
Спасибо за Ваш ответ. Я просто не совсем понял эту часть "# 2 из предложения SELECT - поле функций в TungstenAggregates". Если ты можешь объяснить лучше, будь мил!

Ваш ответ на вопрос