Объединить выходные файлы CSV Spark с одним заголовком
Я хочу создать конвейер обработки данных в AWS, чтобы в конечном итоге использовать обработанные данные для машинного обучения.
У меня есть скрипт Scala, который берет необработанные данные из S3, обрабатывает их и записывает их в HDFS или даже S3 сСпарк-CSV, Я думаю, что я могу использовать несколько файлов в качестве ввода, если я хочу использоватьAWS Машинное обучение инструмент для обучения модели прогнозирования. Но если я хочу использовать что-то еще, я предполагаю, что будет лучше, если я получу один выходной файл CSV.
В настоящее время, как я не хочу использоватьпередел (1) никоалесценции (1) в целях производительности я использовалhadoop fs -getmerge для ручного тестирования, но так как он просто объединяет содержимое выходных файлов задания, я столкнулся с небольшой проблемой. я нуждаюсьодин ряд заголовков в файле данных для обучения модели прогнозирования.
Если я использую.option("header","true")
для spark-csv, он записывает заголовки в каждый выходной файл, и после объединения у меня столько строк заголовков в данных, сколько было выходных файлов. Но если опция заголовка ложна, то она не добавляет никаких заголовков.
Теперь я нашел возможность объединить файлы внутри скрипта Scala с Hadoop APIFileUtil.copyMerge
, Я попробовал это вspark-shell
с кодом ниже.
import org.apache.hadoop.fs.FileUtil
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
val configuration = new Configuration();
val fs = FileSystem.get(configuration);
FileUtil.copyMerge(fs, new Path("smallheaders"), fs, new Path("/home/hadoop/smallheaders2"), false, configuration, "")
Но это решение все еще просто объединяет файлы друг с другом и не обрабатывает заголовки.Как я могу получить выходной файл только с одной строкой заголовков?
Я даже пытался добавитьdf.columns.mkString(",")
в качестве последнего аргумента дляcopyMerge
, но это добавляло заголовки еще несколько раз, а не один раз.