Большое спасибо за ваш ответ. Я думаю, я посмотрю, сработает ли сбор данных в R для моей программы. Если нет, я посмотрю в расширениях Scala.

я есть широкий фрейм данных из нескольких тысяч столбцов примерно на миллион строк, для которого я хотел бы рассчитать итоговые суммы строк. Мое решение пока ниже. Я использовал:dplyr - сумма нескольких столбцов с использованием регулярных выражений а такжеhttps://github.com/tidyverse/rlang/issues/116

library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)

sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")

col_eqn = paste0(colnames(wide_df), collapse = "+" )

# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
               col_eqn,
               ") as total FROM wide_sdf")

dbGetQuery(sc1, query)

# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))

wide_sdf %>% 
    transmute("total" := !!col_eqn2) %>%
        collect() %>%
            as.data.frame()

Проблемы возникают, когда количество столбцов увеличивается. По искровому SQL, кажется, он рассчитывается по одному элементу за раз, т.е. (((V1 + V1) + V3) + V4) ...) Это приводит к ошибкам из-за очень высокой рекурсии.

У кого-нибудь есть альтернативный, более эффективный подход? Любая помощь приветствуется.

Ответы на вопрос(1)

вы достигнете некоторых пределов рекурсии (даже если вы обойдете анализатор SQL, достаточно большая сумма выражений приведет к сбою планировщика запросов). Есть несколько медленных решений:

использованиеspark_apply (за счет конвертации в и из R):

wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })

Преобразовать в длинный формат и агрегировать (за счетexplode и перемешать)

key_expr <- "monotonically_increasing_id() AS key"

value_expr <- paste(
 "explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value"
)

wide_sdf %>% 
  spark_dataframe() %>% 
  # Add id and explode. We need a separate invoke so id is applied
  # before "lateral view"
  sparklyr::invoke("selectExpr", list(key_expr, "*")) %>% 
  sparklyr::invoke("selectExpr", list("key", value_expr)) %>% 
  sdf_register() %>% 
  # Aggregate by id
  group_by(key) %>% 
  summarize(total = sum(value)) %>% 
  arrange(key)

Чтобы получить что-то более эффективное, вы должны написать расширение Scala и применить сумму непосредственно кRow объект, без взрыва:

package com.example.sparklyr.rowsum

import org.apache.spark.sql.{DataFrame, Encoders}

object RowSum {
  def apply(df: DataFrame, cols: Seq[String]) = df.map {
    row => cols.map(c => row.getAs[Double](c)).sum
  }(Encoders.scalaDouble)
}

а также

invoke_static(
  sc, "com.example.sparklyr.rowsum.RowSum", "apply",
  wide_sdf %>% spark_dataframe
) %>% sdf_register()
 swany15 дек. 2017 г., 13:10
Большое спасибо за ваш ответ. Я думаю, я посмотрю, сработает ли сбор данных в R для моей программы. Если нет, я посмотрю в расширениях Scala.

Ваш ответ на вопрос