Объединить выходные файлы CSV Spark с одним заголовком

Я хочу создать конвейер обработки данных в AWS, чтобы в конечном итоге использовать обработанные данные для машинного обучения.

У меня есть скрипт Scala, который берет необработанные данные из S3, обрабатывает их и записывает их в HDFS или даже S3 сСпарк-CSV, Я думаю, что я могу использовать несколько файлов в качестве ввода, если я хочу использоватьAWS Машинное обучение инструмент для обучения модели прогнозирования. Но если я хочу использовать что-то еще, я предполагаю, что будет лучше, если я получу один выходной файл CSV.

В настоящее время, как я не хочу использоватьпередел (1) никоалесценции (1) в целях производительности я использовалhadoop fs -getmerge для ручного тестирования, но так как он просто объединяет содержимое выходных файлов задания, я столкнулся с небольшой проблемой. я нуждаюсьодин ряд заголовков в файле данных для обучения модели прогнозирования.

Если я использую.option("header","true") для spark-csv, он записывает заголовки в каждый выходной файл, и после объединения у меня столько строк заголовков в данных, сколько было выходных файлов. Но если опция заголовка ложна, то она не добавляет никаких заголовков.

Теперь я нашел возможность объединить файлы внутри скрипта Scala с Hadoop APIFileUtil.copyMerge, Я попробовал это вspark-shell с кодом ниже.

import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")

Но это решение все еще просто объединяет файлы друг с другом и не обрабатывает заголовки.Как я могу получить выходной файл только с одной строкой заголовков?

Я даже пытался добавитьdf.columns.mkString(",") в качестве последнего аргумента дляcopyMerge, но это добавляло заголовки еще несколько раз, а не один раз.

 Kyle Heuton03 апр. 2018 г., 18:21
@belka это не разные кадры данных с разными столбцами, это просто разные разделы одного и того же кадра данных с одинаковыми столбцами
 Boern09 мар. 2017 г., 16:27
как насчет фильтрации DataFrame в нулевые строки, экспортировать это с header = true, экспортировать остальные данные с header = false и затем объединить заголовок с разделами?
 V. Samma10 мар. 2017 г., 08:55
В настоящее время я занят другим проектом, но если я вернусь к нему, то запомню этот вопрос.
 belka16 мар. 2018 г., 09:41
@ Boern + all: мой ответ не решает проблему?
 V. Samma06 янв. 2017 г., 15:08
@senthilkumarp К сожалению, нет. Единственный способ получить нужный результат - использоватьdf.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save(resultPath) но это, вероятно, не будет работать так хорошо с большими данными.
 belka26 мар. 2018 г., 18:55
@KyleHeuton, как вы могли бы записать данные нескольких рабочих, но иметь один файл в качестве вывода? Вы должны выполнить выборку всех данных в драйвер ... Я подвергаю сомнению тот факт, что мое решение записывает заголовок несколько раз.
 A. Rabus04 мая 2017 г., 11:35
Если файлы не такие большие, я использую оболочку для удаления и повторного добавления строки заголовка: head -1 getmerge.csv> header; grep -v "header1, header2," getmerge.csv> tmpcsv # или все, что только apperas в заголовках; заголовок кошки tmpcsv> my.csv; rm getmerge.csv # очистить
 Boern09 мар. 2017 г., 16:46
Если вы найдете элегантное решение, пожалуйста, дайте мне знать! У меня часто возникала одна и та же проблема, и я, безусловно, поддерживал.
 senthil kumar p06 янв. 2017 г., 16:58
да, я тоже использовал ту же коалесцию (1)
 belka03 апр. 2018 г., 10:13
@KyleHeuton На самом деле нет способа убедиться, что порядок столбцов в первомpart-000 файл (файл с указанным заголовком) такой же, как во втором (или третьем ...) файлеpart-000, Потому что Спарк не знает какunion разные DataFrames с разными именами столбцов.
 Kyle Heuton26 мар. 2018 г., 18:24
@belka, похоже, ваше первое решение - то, что в настоящее время имеет В. Самма, но несколько раз помещает заголовок в файл. Ваши другие решения используютcoalesce а такжеrepartition что тоже не идеально. Хорошее решение будет по-прежнему иметь данные, написанные разными работниками, но создать один файл с 1 заголовком
 senthil kumar p06 янв. 2017 г., 09:44
Я также столкнулся с той же проблемой. Это исправлено?
 V. Samma09 мар. 2017 г., 16:44
@ Берн это может сработать. Хотя я думаю, что для этого потребуется скопировать файл заголовков в тот же вывод, что и данные, и убедиться, что это всегда первый файл. Я думаю, что это текущее решение не позволит писать по тому же пути. Конечно, добавление может решить эту проблему, нужно попытаться поиграть с этим некоторое время.
 Kyle Heuton26 мар. 2018 г., 19:28
@bleka «Как» в этом суть этого вопроса. Можно представить себе флаг, который зажигает, который говорит, что нужно только сохранить заголовок с файлом, обозначеннымpart-0000или, возможно, интеллектуальная конкатенация, которая объединяет файлы, сохраненные несколькими работниками, но сохраняет заголовок только от одного из них.copyMerge похоже, он просто объединяет файлы, поэтому, если у файлов есть заголовки, заголовок будет появляться несколько раз, или если у файлов нет заголовков, заголовок вообще не будет, как говорит В. Самма в своем вопросе. Или делаетcopyMerge есть другое поведение в вашем ответе?
 V. Samma02 апр. 2018 г., 17:44
@belka К сожалению, прошло уже почти 2 года с тех пор, как я опубликовал это, и я больше не работаю со Spark, но я помню проблему именно так, как описал @Kyle Heuton. Проблема заключалась в том, что если на выходе были указаны несколько файлов, вы можете добавить заголовок ко всем или ни к одному из них. И затем, объединяя их, у вас будет несколько заголовков, разбросанных по данным, или их вообще не будет. Хотя я думаюcoalesce исправил, думаю, я помню, что это было медленнее, чем слияние, поэтому в идеале мог бы быть способ записи только заголовков в 1 файл, и при слиянии используйте это как первый файл.

Ответы на вопрос(5)

 // Convert JavaRDD  to CSV and save as text file
        outputDataframe.write()
                .format("com.databricks.spark.csv")
                // Header => true, will enable to have header in each file
                .option("header", "true")

Пожалуйста, перейдите по ссылке с интеграционным тестом о том, как написать один заголовок

http://bytepadding.com/big-data/spark/write-a-csv-text-file-from-spark/

Выведите заголовок, используя dataframe.schema (val header = dataDF.schema.fieldNames.reduce (_ + "," + _))создайте файл с заголовком на dsefsдобавьте все файлы разделов (без заголовка) в файл в # 2 с помощью API файловой системы hadoop

Попробуйте указать схему заголовка и прочитать все файлы из папки, используя опцию drop, неправильно сформированную из spark-csv. Это должно позволить вам читать все файлы в папке, сохраняя только заголовки (потому что вы отбрасываете неправильный формат). Пример:

val headerSchema = List(
  StructField("example1", StringType, true),
  StructField("example2", StringType, true),
  StructField("example3", StringType, true)
)

val header_DF =sqlCtx.read
  .option("delimiter", ",")
  .option("header", "false")
  .option("mode","DROPMALFORMED")
  .option("inferSchema","false")
  .schema(StructType(headerSchema))
  .format("com.databricks.spark.csv")
  .load("folder containg the files")

В header_DF у вас будут только строки заголовков, отсюда вы можете трансформировать фрейм данных так, как вам нужно.

 V. Samma02 апр. 2018 г., 17:37
Хотя я больше не работаю со Spark, у меня нет системы, где я мог бы это проверить, ваш пример может очень хорошо работать с небольшими структурами. Но у меня был случай, когда у нас было 200-300 полей в одной структуре (структуры, вложенные в другие структуры и т. Д.), И эта схема менялась довольно быстро по мере изменения системы. Поэтому определение схемы вручную не было возможным вариантом.

Вы можете ходить вокруг так.

1. Создайте новый DataFrame (headerDF), содержащий имена заголовков.2. Соедините его с DataFrame (dataDF), содержащим данные.3. Выведите объединенный DataFrame на диск сопция ("заголовок", "ложь").4. объединить файлы разделов (part-0000 ** 0.csv), используя hadoop FileUtil

Таким образом, все разделы не имеют заголовка, за исключением того, что содержимое одного раздела имеет строку имен заголовков из headerDF. Когда все разделы объединены, в верхней части файла появляется один заголовок. Пример кода следующий

  //dataFrame is the data to save on disk
  //cast types of all columns to String
  val dataDF = dataFrame.select(dataFrame.columns.map(c => dataFrame.col(c).cast("string")): _*)

  //create a new data frame containing only header names
  import scala.collection.JavaConverters._
  val headerDF = sparkSession.createDataFrame(List(Row.fromSeq(dataDF.columns.toSeq)).asJava, dataDF.schema)

  //merge header names with data
  headerDF.union(dataDF).write.mode(SaveMode.Overwrite).option("header", "false").csv(outputFolder)

  //use hadoop FileUtil to merge all partition csv files into a single file
  val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
  FileUtil.copyMerge(fs, new Path(outputFolder), fs, new Path("/folder/target.csv"), true, spark.sparkContext.hadoopConfiguration, null)
 kn3l07 дек. 2018 г., 01:46
Вы можете помочь проверить этоstackoverflow.com/questions/53633786/... @Kang

Чтобы объединить файлы в папке в один файл:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._

def merge(srcPath: String, dstPath: String): Unit =  {
  val hadoopConfig = new Configuration()
  val hdfs = FileSystem.get(hadoopConfig)
  FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
}

Если вы хотите объединить все файлы в один файл, но все еще в одной папке (но это приводит все данные к узлу драйвера):

dataFrame
      .coalesce(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .save(out)

Другое решение состоит в том, чтобы использовать решение № 2, а затем переместить один файл в папке в другой путь (с именем нашего CSV-файла).

def df2csv(df: DataFrame, fileName: String, sep: String = ",", header: Boolean = false): Unit = {
    val tmpDir = "tmpDir"

    df.repartition(1)
      .write
      .format("com.databricks.spark.csv")
      .option("header", header.toString)
      .option("delimiter", sep)
      .save(tmpDir)

    val dir = new File(tmpDir)
    val tmpCsvFile = tmpDir + File.separatorChar + "part-00000"
    (new File(tmpCsvFile)).renameTo(new File(fileName))

    dir.listFiles.foreach( f => f.delete )
    dir.delete
}

Ваш ответ на вопрос