PySpark: вычислить максимум строки подмножества столбцов и добавить к существующему фрейму данных

Я хотел бы вычислить максимум поднабора столбцов для каждой строки и добавить его в качестве нового столбца для существующегоDataframe.

Мне удалось сделать это очень неловко:

def add_colmax(df,subset_columns,colnm):
     '''
     calculate the maximum of the selected "subset_columns" from dataframe df for each row, 
     new column containing row wise maximum is added to dataframe df. 

     df: dataframe. It must contain subset_columns as subset of columns
     colnm: Name of the new column containing row-wise maximum of subset_columns
     subset_columns: the subset of columns from w
     '''
     from pyspark.sql.functions import monotonicallyIncreasingId
     from pyspark.sql import Row
     def get_max_row_with_None(row):
         return float(np.max(row))

     df_subset = df.select(subset_columns)
     rdd = df_subset.map( get_max_row_with_None)
     df_rowsum = rdd.map(Row(colnm)).toDF()
     df_rowsum = df_rowsum.withColumn("id",monotonicallyIncreasingId())
     df = df.withColumn("id",monotonicallyIncreasingId())
     df = df.join(df_rowsum,df.id == df_rowsum.id).drop(df.id).drop(df_rowsum.id)
     return df

Эта функция работает как:

rdd1 =  sc.parallelize([("foo", 1.0,3.0,None), 
                    ("bar", 2.0,2.0,-10), 
                    ("baz", 3.3,1.2,10.0)])


df1 = sqlContext.createDataFrame(rdd1, ('v1', 'v2','v3','v4'))
df_new = add_colmax(df1,['v2','v3','v4'],"rowsum")   
df_new.collect()

возвращает:

 [Row(v1=u'bar', v2=2.0, v3=2.0, v4=-10, rowsum=2.0),
  Row(v1=u'baz', v2=3.3, v3=1.2, v4=None, rowsum=3.3),
  Row(v1=u'foo', v2=1.0, v3=3.0, v4=None, rowsum=3.0)]

Я думаю, что если бы я мог использовать пользовательские функции сwithColumnэто можно сделать гораздо проще. Но я не мог понять, как это сделать. Пожалуйста, дайте мне знать, если у вас есть более простой способ добиться этого. Я использую Spark 1.6

Ответы на вопрос(1)

Ваш ответ на вопрос