SparkR: Split-Apply-Combine im Dplyr-Stil für DataFrame
Unter dem vorherigen RDD-Paradigma könnte ich einen Schlüssel angeben und dann eine Operation den RDD-Elementen zuordnen, die jedem Schlüssel entsprechen. Ich sehe keinen klaren Weg, um dies mit DataFrame in SparkR ab 1.5.1 zu tun. Was ich tun möchte, ist so etwas wie eindplyr
Betrieb
new.df <- old.df %>%
group_by("column1") %>%
do(myfunc(.))
Ich habe derzeit einen großen SparkR-Datenrahmen der Form:
timestamp value id
2015-09-01 05:00:00.0 1.132 24
2015-09-01 05:10:00.0 null 24
2015-09-01 05:20:00.0 1.129 24
2015-09-01 05:00:00.0 1.131 47
2015-09-01 05:10:00.0 1.132 47
2015-09-01 05:10:00.0 null 47
Ich habe sortiert nach,id
undtimestamp
.
Ich möchte nach @ gruppierid
, aber ich möchte nicht aggregieren. Stattdessen möchte ich eine Reihe von Transformationen und Berechnungen für jede Gruppe durchführen - zum Beispiel Interpolation, um NAs zu füllen (die generiert werden, wenn ichcollect
der DataFrame und dann konvertierenvalue
zu numerisch). Ich habe mit @ getestagg
, aber während meine Berechnungen zu laufen scheinen, werden die Ergebnisse nicht zurückgegeben, da ich in @ keinen einzelnen Wert zurückgemyfunc
:
library(zoo)
myfunc <- function(df) {
df.loc <- collect(df)
df.loc$value <- as.numeric(df.loc$value)
df.loc$newparam <- na.approx(df.loc$value, na.rm = FALSE)
return(df.loc)
# I also tested return(createDataFrame(sqlContext, df.loc)) here
}
df <- read.df( # some stuff )
grp <- group_by(df, "id")
test <- agg(grp, "myfunc")
15/11/11 18:45:33 INFO scheduler.DAGScheduler: Job 2 finished: dfToCols at NativeMethodAccessorImpl.java:-2, took 0.463131 s
id
1 24
2 47
Beachten Sie, dass die Operationen inmyfunc
Alle funktionieren richtig, wenn ichfilter
der DataFrame auf ein einzelnesid
und starte es. Basierend auf der Zeit, die für die Ausführung benötigt wird (ca. 50 Sekunden pro Task) und der Tatsache, dass keine Ausnahmen ausgelöst werden, glaube ich,myfunc
wird in der Tat auf allen @ ausgefühid
s - aber ich brauche die Ausgabe!
Jede Eingabe wäre dankbar.