Сравнение столбцов в Писпарке

Я работаю над PySpark DataFrame с n столбцами. У меня есть набор из m столбцов (m <n), и моя задача состоит в том, чтобы выбрать столбец с максимальными значениями в нем.

Например:

Входные данные: PySpark DataFrame, содержащий col_1 = [1,2,3], col_2 = [2,1,4], col_3 = [3,2,5].

Ouput = col_4 = max (col1, col_2, col_3) = [3,2,5] в этом примере.

В пандах есть нечто подобное, как объяснено вэтот вопрос.

Есть ли способ сделать это в PySpark, или я должен изменить конвертирование PySpark df в Pandas df и затем выполнить операции?

 Quetzalcoatl22 сент. 2018 г., 23:17
если вопрос заключается в получении максимального значения для каждого столбца, то похоже, что ожидаемый результат должен быть [max (col_1), max (col_2), max (col_3)] = [3, 4, 5]

Ответы на вопрос(4)

Решение Вопроса

from pyspark.sql.functions import max as max_, col, when
from functools import reduce

def row_max(*cols):
    return reduce(
        lambda x, y: when(x > y, x).otherwise(y),
        [col(c) if isinstance(c, str) else c for c in cols]
    )

df = (sc.parallelize([(1, 2, 3), (2, 1, 2), (3, 4, 5)])
    .toDF(["a", "b", "c"]))

df.select(row_max("a", "b", "c").alias("max")))

Spark 1.5+ также обеспечиваетleast, greatest

from pyspark.sql.functions import greatest

df.select(greatest("a", "b", "c"))

Если вы хотите сохранить имя максимума, вы можете использовать `structs:

from pyspark.sql.functions import struct, lit

def row_max_with_name(*cols):
    cols_ = [struct(col(c).alias("value"), lit(c).alias("col")) for c in cols]
    return greatest(*cols_).alias("greatest({0})".format(",".join(cols)))

 maxs = df.select(row_max_with_name("a", "b", "c").alias("maxs"))

И, наконец, вы можете использовать выше, чтобы найти выбрать "верхний" столбец:

from pyspark.sql.functions import max

((_, c), ) = (maxs
    .groupBy(col("maxs")["col"].alias("col"))
    .count()
    .agg(max(struct(col("count"), col("col"))))
    .first())

df.select(c)
 user156934107 авг. 2019 г., 18:28
это очень полезно! как найти второго по величине вместо этого? Я хочу получить имя второго по величине столбца

df = sc.parallelize(Seq((10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4))).toDF("c1", "c2", "c3"))  

df.rdd.map(row=>List[String](row(0).toString,row(1).toString,row(2).toString)).map(x=>(x(0),x(1),x(2),x.min)).toDF("c1","c2","c3","min").show    

+---+---+---+---+  
| c1| c2| c3|min|  
+---+---+---+---+  
| 10| 10|  1|  1|    
|200|  2| 20|  2|  
|  3| 30|300|  3|  
|400| 40|  4|  4|  
+---+---+---+---+  

что нижеdf ваш фрейм данных

df = sc.parallelize([(10, 10, 1 ), (200, 2, 20), (3, 30, 300), (400, 40, 4)]).toDF(["c1", "c2", "c3"])
df.show()

+---+---+---+
| c1| c2| c3|
+---+---+---+
| 10| 10|  1|
|200|  2| 20|
|  3| 30|300|
|400| 40|  4|
+---+---+---+

Вы можете обработать вышеупомянутый df как ниже, чтобы получить желаемые результаты

from pyspark.sql.functions import lit, min

df.select( lit('c1').alias('cn1'), min(df.c1).alias('c1'),
           lit('c2').alias('cn2'), min(df.c2).alias('c2'),
           lit('c3').alias('cn3'), min(df.c3).alias('c3')
          )\
         .rdd.flatMap(lambda r: [ (r.cn1, r.c1), (r.cn2, r.c2), (r.cn3, r.c3)])\
         .toDF(['Columnn', 'Min']).show()

+-------+---+
|Columnn|Min|
+-------+---+
|     c1|  3|
|     c2|  2|
|     c3|  1|
+-------+---+
 Hemant29 мар. 2017 г., 15:19
Вы делаете min (col1), тогда как я хочу min (row1), min (row2) .. и так далее ...

Вы также можете использовать встроенный в pysparkleast:

from pyspark.sql.functions import least, col
df = df.withColumn('min', least(col('c1'), col('c2'), col('c3')))

Ваш ответ на вопрос