Como fazer com que o Spark Streaming escreva sua saída para que o Impala possa lê-la?

Eu tenho o seguinte problema com a API de streaming do Spark. Atualmente, estou transmitindo dados de entrada via Flume para Spark Streaming, com o qual pretendo fazer um pré-processamento para os dados. Em seguida, gostaria de salvar os dados no sistema de arquivos do Hadoop e consultá-los com o Impala. No entanto, o Spark está gravando os arquivos de dados em diretórios separados e um novo diretório é gerado para cada RDD.

Isso é um problema porque, em primeiro lugar, as tabelas externas no Impala não podem detectar subdiretórios, mas apenas arquivos, dentro do diretório para o qual estão apontando, a menos que particionados. Em segundo lugar, os novos diretórios são adicionados tão rapidamente pelo Spark que seria muito ruim para o desempenho criar uma nova partição periodicamente no Impala para cada diretório gerado. Por outro lado, se eu optar por aumentar o intervalo de rolagem das gravações no Spark, para que os diretórios sejam gerados com menos frequência, haverá um atraso adicional até que o Impala possa ler os dados recebidos. Isso não é aceitável, pois meu sistema precisa suportar aplicativos em tempo real. No Hive, eu poderia configurar as tabelas externas para também detectar os subdiretórios sem a necessidade de particionar, usando estas configurações:

set hive.mapred.supports.subdirectories=true;
set mapred.input.dir.recursive=true;

Mas, no meu entender, o Impala não possui um recurso como esse.

Atualmente, estou usando o seguinte código para ler os dados do Flume e gravá-los no HDFS:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8"))).saveAsTextFiles(path)

Aqui, o caminho da variável determina o prefixo do diretório ao qual os arquivos de texto (parte-0000 e assim por diante) são adicionados e o restante do nome do diretório é um carimbo de data e hora gerado pelo Spark. Eu poderia mudar o código para algo assim:

val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)
val mapStream = stream.map(event => new String(event.event.getBody().array(), Charset.forName("UTF-8")))
mapStream.foreachRDD(rdd => rdd.saveAsTextFile(path))

Nesse caso, os arquivos serão adicionados ao mesmo diretório determinado pelo caminho, mas, como sempre são nomeados como parte-00000, parte-00001, parte-00002, etc., os arquivos gerados anteriormente serão substituídos. Ao examinar o código fonte do Spark, notei que os nomes dos arquivos são determinados por uma linha no método open () do SparkHadoopWriter:

val outputName = "part-"  + numfmt.format(splitID)

E parece-me que não há como manipular splitID através da API Spark. Para resumir, minhas perguntas são as seguintes:

Existe algum método para fazer com que as tabelas externas no Impala detectem subdiretórios?Caso contrário, existe algum método para fazer o Spark gravar seus arquivos de saída em um único diretório ou de outra forma em um formato que seja instantaneamente legível pelo Impala?Caso contrário, há algum tipo de atualização esperada com o Spark para corrigir esse problema ou devo apenas ramificar minha própria versão do Spark com a qual posso decidir os nomes dos arquivos que ele mesmo gravará?

questionAnswers(1)

yourAnswerToTheQuestion