Как заставить Spark Streaming записывать свои выходные данные, чтобы Impala могла их прочитать?

У меня есть следующая проблема с Spark Streaming API. В настоящее время я передаю входные данные через Flume в Spark Streaming, с помощью которого я планирую выполнить некоторую предварительную обработку данных. Затем я хотел бы сохранить данные в файловой системе Hadoop и запросить их с помощью Impala. Однако Spark записывает файлы данных в отдельные каталоги, и для каждого СДР создается новый каталог.

Это проблема, потому что, во-первых, внешние таблицы в Impala не могут обнаружить подкаталоги, а только файлы внутри каталога, на который они указывают, если они не разделены. Во-вторых, Spark настолько быстро добавляет новые каталоги, что для производительности будет очень плохо периодически создавать новый раздел в Impala для каждого сгенерированного каталога. С другой стороны, если я решу увеличить интервал прокрутки записей в Spark, чтобы каталоги создавались реже, добавится задержка, пока Impala не сможет прочитать входящие данные. Это неприемлемо, поскольку моя система должна поддерживать приложения реального времени. В Hive я мог настроить внешние таблицы так, чтобы они также определяли подкаталоги без необходимости их разбиения, используя следующие параметры:

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

Но, насколько я понимаю, Импала не имеет такой функции, как эта.

В настоящее время я использую следующий код для чтения данных из Flume и записи их в HDFS:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

Здесь переменная path определяет префикс каталога, к которому добавляются текстовые файлы (part-0000 и т. Д.), А остальная часть имени каталога представляет собой метку времени, сгенерированную Spark. Я мог бы изменить код на что-то вроде этого:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

В этом случае файлы будут добавлены в один и тот же каталог, определенный путем, но, поскольку они всегда называются part-00000, part-00001, part-00002 и т. Д., Ранее созданные файлы будут перезаписаны. При изучении исходного кода Spark я заметил, что имена файлов определяются строкой в ​​методе open () SparkHadoopWriter:

val outputName = "part-"  + numfmt.format(splitID)

И мне кажется, что нет способа манипулировать splitID через Spark API. Подводя итог, мои вопросы следующие:

Есть ли способ заставить внешние таблицы в Impala обнаруживать подкаталоги?Если нет, есть ли способ заставить Spark записывать свои выходные файлы в один каталог или иным образом в форме, которая мгновенно читается Impala?Если нет, то ожидается ли какое-либо обновление со Spark, чтобы исправить эту проблему, или я должен просто разветвлять свою собственную версию Spark, с помощью которой я могу сам определять имена файлов, которые он пишет?

Ответы на вопрос(1)

Ваш ответ на вопрос