Gravando quadros de dados R retornados do SparkR ::: map
Estou usando o mapa SparkR ::: e minha função retorna um quadro de dados R grande para cada linha de entrada, cada uma da mesma forma. Gostaria de escrever esses quadros de dados como arquivos em parquet sem 'colecioná-los'. Posso mapear write.df na minha lista de saída? Posso fazer com que as tarefas do trabalhador escrevam o parquet?
Agora tenho umtrabalhando. exemplo. Estou feliz com isso, pois não esperava que a redução implicitamente 'coletasse', pois queria escrever o DF resultante como Parquet.
Além disso, não estou convencido de que ::: map realmente faça algo em paralelo. Também preciso sempre chamar 'paralelismo'?
#! /usr/bin/Rscript
library(SparkR, lib.loc="/opt/spark-1.5.1-bin-without-hadoop/R/lib")
source("jdbc-utils.R")
options(stringsAsFactors = FALSE)
# I dislike having these here but when I move them into main(), it breaks - the sqlContext drops.
assign("sc", sparkR.init(master = "spark://poc-master-1:7077",
sparkHome = "/opt/spark-1.5.1-bin-without-hadoop/",
appName = "Peter Spark test",
list(spark.executor.memory="4G")), envir = .GlobalEnv)
assign("sqlContext", sparkRSQL.init(sc), envir =.GlobalEnv)
#### MAP function ####
run.model <- function(v) {
x <- v$xs[1]
y <- v$ys[1]
startTime <- format(Sys.time(), "%F %T")
xs <- c(1:x)
endTime <- format(Sys.time(), "%F %T")
hostname <- system("hostname", intern = TRUE)
xys <- data.frame(xs,y,startTime,endTime,hostname,stringsAsFactors = FALSE)
return(xys)
}
# HERE BE THE SCRIPT BIT
main <- function() {
# Make unique identifiers for each run
xs <- c(1:365)
ys <- c(1:1)
xys <- data.frame(xs,ys,stringsAsFactors = FALSE)
# Convert to Spark dataframe for mapping
sqlContext <- get("sqlContext", envir = .GlobalEnv)
xys.sdf <- createDataFrame(sqlContext, xys)
# Let Spark do what Spark does
output.list <- SparkR:::map(xys.sdf, run.model)
# Reduce gives us a single R dataframe, which may not be what we want.
output.redux <- SparkR:::reduce(output.list, rbind)
# Or you can have it as a list of data frames.
output.col <- collect(output.list)
return(NULL)
}