Вы создаете свою схему .// Пример: val innerSchema = StructType (Array (StructField ("value", StringType), StructField ("count", LongType)))

учаю твиты из темы кафки с Avro (сериализатор и десериализатор). Затем я создаю искровой потребитель, который извлекает твиты в Dstream of RDD [GenericRecord]. Теперь я хочу преобразовать каждый rdd в фрейм данных для анализа этих твитов с помощью SQL. Любое решение для преобразования RDD [GenericRecord] в dataframe, пожалуйста?

 Ramesh Maharjan13 нояб. 2017 г., 16:29
Вы можете обновить некоторые данные RDD [GenericRecord], выполнив foreach (println)?

Ответы на вопрос(4)

Хотя что-то вроде этого может помочь вам,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
})

Я хотел бы предложить вам альтернативный подход. С Spark 2.x вы можете пропустить весь процесс созданияDStreams, Вместо этого вы можете сделать что-то подобное со структурированным потоковым

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

Это даст вам один фрейм данных, который вы можете напрямую запросить. Вот,ss это пример искровой сессии./path/to/files это место, где все ваши avro файлы выгружаются из kafka.

PS: Вам может понадобиться импортироватьspark-avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

Надеюсь, это помогло. ура

 Slim AZAIZ14 нояб. 2017 г., 09:20
У меня нет файлов, я хочу преобразовать RDD [GenericRecord] в dataframe

https://stackoverflow.com/a/48828303/5957143 а такжеhttps://stackoverflow.com/a/47267060/5957143 работает для меня.

Я использовал следующее для создания MySchemaConversions

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

А потом я использовал

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

// unionedResultRdd is unionRDD [GenericRecord]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

Преимущество наличия myConverter в объекте MyObject заключается в том, что вы не столкнетесь с проблемами сериализации (java.io.NotSerializableException).

object MyObject{
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)
}
Решение Вопроса

пытаясь заставить эту работу (особенно, как правильно десериализовать данные, но, похоже, вы уже рассмотрели это) ... ОБНОВЛЕНО

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

Как вы видите, я использую SchemaConverter для получения структуры dataframe из схемы, которую вы использовали для десериализации (это может быть более болезненно с реестром схемы). Для этого вам нужна следующая зависимость

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

вам нужно будет изменить свою версию искры в зависимости от вашей.

ОБНОВЛЕНИЕ: приведенный выше код работает только дляплоский авро схемы.

Завложенными структуры я использовал что-то другое. Вы можете скопировать классSchemaConvertersдолжно быть внутриcom.databricks.spark.avro (он использует некоторые защищенные классы из пакета databricks) или вы можете попробовать использоватьискровым BigQuery зависимость. Класс не будет доступен по умолчанию, поэтому вам нужно будет создать класс внутри пакетаcom.databricks.spark.avro чтобы получить доступ к заводскому методу.

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

После этого вы сможете преобразовать данные как

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)
 Slim AZAIZ14 нояб. 2017 г., 14:26
спасибо человек это работает
 hlagos14 нояб. 2017 г., 14:24
обновлен порядок преобразования GenericRecord в RDD
 hlagos14 нояб. 2017 г., 14:32
уш! просто убедитесь, что вы просматриваете мое последнее изменение .. Я пропустил передачу значений в объект objectArray перед созданием необработанных
 Slim AZAIZ14 нояб. 2017 г., 09:33
метод createDataFrame нужен в качестве аргументов RDD [ROW] и structType, но у меня в моем случае createDataFramei есть RDD [GenericRecord]

RDD [Row], schema: StructType), который доступен в объекте SQLContext. Пример для преобразования RDD старого DataFrame:

import sqlContext.implicits.
val rdd = oldDF.rdd
val newDF = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Обратите внимание, что нет необходимости явно устанавливать любой столбец схемы. Мы повторно используем старую схему DF, которая имеет класс StructType и может быть легко расширена. Однако такой подход иногда невозможен, а в некоторых случаях может быть менее эффективным, чем первый.

 Saghe Achraf13 нояб. 2017 г., 15:47
Вы создаете свою схему .// Пример: val innerSchema = StructType (Array (StructField ("value", StringType), StructField ("count", LongType)))
 Slim AZAIZ13 нояб. 2017 г., 15:21
У меня нет старого кадра данных. У меня есть только RDD [GenericRecord]

Ваш ответ на вопрос