Транспонировать столбец в ряд с помощью Spark

Я пытаюсь переместить некоторые столбцы моей таблицы в строку. Я использую Python и Spark 1.5.0. Вот моя начальная таблица:

+-----+-----+-----+-------+
|  A  |col_1|col_2|col_...|
+-----+-------------------+
|  1  |  0.0|  0.6|  ...  |
|  2  |  0.6|  0.7|  ...  |
|  3  |  0.5|  0.9|  ...  |
|  ...|  ...|  ...|  ...  |

Я хотел бы иметь что-то вроде этого:

+-----+--------+-----------+
|  A  | col_id | col_value |
+-----+--------+-----------+
|  1  |   col_1|        0.0|
|  1  |   col_2|        0.6|   
|  ...|     ...|        ...|    
|  2  |   col_1|        0.6|
|  2  |   col_2|        0.7| 
|  ...|     ...|        ...|  
|  3  |   col_1|        0.5|
|  3  |   col_2|        0.9|
|  ...|     ...|        ...|

Кто-нибудь знает, как я могу это сделать? Спасибо за помощь.

 hi-zir11 мая 2018 г., 20:58

Ответы на вопрос(6)

Решение Вопроса

Это относительно просто сделать с основными функциями Spark SQL.

питон

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

to_long(df, ["A"])

Scala:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.{array, col, explode, lit, struct}

val df = Seq((1, 0.0, 0.6), (1, 0.6, 0.7)).toDF("A", "col_1", "col_2")

def toLong(df: DataFrame, by: Seq[String]): DataFrame = {
  val (cols, types) = df.dtypes.filter{ case (c, _) => !by.contains(c)}.unzip
  require(types.distinct.size == 1, s"${types.distinct.toString}.length != 1")      

  val kvs = explode(array(
    cols.map(c => struct(lit(c).alias("key"), col(c).alias("val"))): _*
  ))

  val byExprs = by.map(col(_))

  df
    .select(byExprs :+ kvs.alias("_kvs"): _*)
    .select(byExprs ++ Seq($"_kvs.key", $"_kvs.val"): _*)
}

toLong(df, Seq("A"))
 Raphael Roth27 янв. 2017 г., 09:15
Я не думаю, что это "относительно" просто :)
 vvp30 апр. 2019 г., 07:49
Как написать это на Java?
 Shekhar Koirala10 окт. 2018 г., 07:37
я понял ошибкуAssertionError: All columns have to be of the same type

Я взял ответ Scala, который написал @javadba, и создал версию на Python для транспонирования всех столбцов вDataFrame, Это может немного отличаться от того, что спрашивал ОП ...

from itertools import chain
from pyspark.sql import DataFrame


def _sort_transpose_tuple(tup):
    x, y = tup
    return x, tuple(zip(*sorted(y, key=lambda v_k: v_k[1], reverse=False)))[0]


def transpose(X):
    """Transpose a PySpark DataFrame.

    Parameters
    ----------
    X : PySpark ``DataFrame``
        The ``DataFrame`` that should be tranposed.
    """
    # validate
    if not isinstance(X, DataFrame):
        raise TypeError('X should be a DataFrame, not a %s' 
                        % type(X))

    cols = X.columns
    n_features = len(cols)

    # Sorry for this unreadability...
    return X.rdd.flatMap( # make into an RDD
        lambda xs: chain(xs)).zipWithIndex().groupBy( # zip index
        lambda val_idx: val_idx[1] % n_features).sortBy( # group by index % n_features as key
        lambda grp_res: grp_res[0]).map( # sort by index % n_features key
        lambda grp_res: _sort_transpose_tuple(grp_res)).map( # maintain order
        lambda key_col: key_col[1]).toDF() # return to DF

Например:

>>> X = sc.parallelize([(1,2,3), (4,5,6), (7,8,9)]).toDF()
>>> X.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+

>>> transpose(X).show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
|  1|  4|  7|
|  2|  5|  8|
|  3|  6|  9|
+---+---+---+

Библиотеки локальной линейной алгебры Spark в настоящее время очень слабы и не включают базовых операций, как указано выше.

Существует исправление JIRA для Spark 2.1, но это не поможетсегодня.

Что следует учитывать: выполнение транспонирования, вероятно, потребует полной перестановки данных.

На данный момент вам нужно будет написать код RDD напрямую. Я написалtranspose в Scala - но не в Python. Здесьscala версия:

 def transpose(mat: DMatrix) = {
    val nCols = mat(0).length
    val matT = mat
      .flatten
      .zipWithIndex
      .groupBy {
      _._2 % nCols
    }
      .toSeq.sortBy {
      _._1
    }
      .map(_._2)
      .map(_.map(_._1))
      .toArray
    matT
  }

Таким образом, вы можете преобразовать это в Python для вашего использования. У меня нет пропускной способности для записи / проверки этого в данный момент: дайте мне знать, если вы не смогли сделать это преобразование.

По крайней мере, следующие легко конвертируются вpython.

zipWithIndex ->enumerate() (эквивалент Python - кредит на @ zero323)map ->[someOperation(x) for x in ..]groupBy ->itertools.groupBy()

Вот реализация дляflatten который не имеет эквивалента Python:

  def flatten(L):
        for item in L:
            try:
                for i in flatten(item):
                    yield i
            except TypeError:
                yield item

Таким образом, вы должны быть в состоянии собрать их вместе для решения.

 zero32316 июн. 2016 г., 20:44
zipWithIndex ->enumerate() (Эквивалент Python)?
 javadba16 июн. 2016 г., 21:57
msgstr "не перемещает данные так сильно". Скорее всего, правда. Мой код представляет собой серию преобразований, вызывающих перемешивание. Более подробно рассмотрим ваш подход с точки зрения «сохранить существующую схему разбиения». Обновление: да, у вас меньше шагов xform - так быстрее и, вероятно, легче понять.
 javadba16 июн. 2016 г., 20:51
@ zero323 Хорошие глаза! Я собираюсь объявить ваш хороший ответ, кстати.
 Raouf16 июн. 2016 г., 19:23
Спасибо за ваш ответ. Я не знаю scala, но я постараюсь понять ваш код. Я буду держать вас в курсе.
 zero32316 июн. 2016 г., 20:56
Благодарю. Это немного более многословно, но не так сильно перемещает данные.
 javadba16 июн. 2016 г., 19:30
@Raouf Код выше всего имеет эквиваленты в Python. Если вы хорошо знаете Python, проблем не должно быть. Я показалflatten который является единственным отсутствующим в питоне. Дай мне знать ;)

Один из способов решить сpyspark sql используя функцииcreate_map а такжеexplode.

from pyspark.sql import functions as func
#Use `create_map` to create the map of columns with constant 
df = df.withColumn('mapCol', \
                    func.create_map(func.lit('col_1'),df.col_1,
                                    func.lit('col_2'),df.col_2,
                                    func.lit('col_3'),df.col_3
                                   ) 
                  )
#Use explode function to explode the map 
res = df.select('*',func.explode(df.mapCol).alias('col_id','col_value'))
res.show()

Очень удобный способ реализации:

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID' : k, 'colValue' : row[k]})

    newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander)

Используйте flatmap. Что-то вроде ниже должно работать

from pyspark.sql import Row

def rowExpander(row):
    rowDict = row.asDict()
    valA = rowDict.pop('A')
    for k in rowDict:
        yield Row(**{'A': valA , 'colID': k, 'colValue': row[k]})

newDf = sqlContext.createDataFrame(df.rdd.flatMap(rowExpander))
 Raouf16 июн. 2016 г., 19:19
Спасибо за ваш ответ. Но это не работает. Вот сообщение об ошибке, которое я получаю:Ошибка типа: индексы кортежа должны быть целыми числами, а не

Ваш ответ на вопрос