¿Cómo puedo particionar los RDD de pyspark con funciones R?
import rpy2.robjects as robjects
dffunc = sc.parallelize([(0,robjects.r.rnorm),(1,robjects.r.runif)])
dffunc.collect()
Salidas
[(0, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc28618 / R:0x26abd18>), (1, <rpy2.rinterface.SexpClosure - Python:0x7f2ecfc283d8 / R:0x26aad28>)]
Mientras que la versión particionada produce un error:
dffuncpart = dffunc.partitionBy(2)
dffuncpart.collect()
RuntimeError: ('R cannot evaluate code before being initialized.', <built-in function unserialize>
Parece que este error es queR
no se cargó en una de las particiones, lo que supongo que implica que no se realizó el primer paso de importación. ¿Hay alguna forma de evitar esto?
EDITAR 1 Este segundo ejemplo me hace pensar que hay un error en el momento de pyspark o rpy2.
dffunc = sc.parallelize([(0,robjects.r.rnorm), (1,robjects.r.runif)]).partitionBy(2)
def loadmodel(model):
import rpy2.robjects as robjects
return model[1](2)
dffunc.map(loadmodel).collect()
Produce el mismo error R no puede evaluar el código antes de ser inicializado.
dffuncpickle = sc.parallelize([(0,pickle.dumps(robjects.r.rnorm)),(1,pickle.dumps(robjects.r.runif))]).partitionBy(2)
def loadmodelpickle(model):
import rpy2.robjects as robjects
import pickle
return pickle.loads(model[1])(2)
dffuncpickle.map(loadmodelpickle).collect()
Funciona tal como se esperaba.