Como evitar a leitura de arquivos antigos do S3 ao anexar novos dados?

De duas em duas horas, o trabalho do spark está em execução para converter alguns arquivos tgz em parquet. O trabalho anexa os novos dados a um parquet existente no s3:

df.write.mode("append").partitionBy("id","day").parquet("s3://myBucket/foo.parquet")

Na saída de envio de faísca, vejo que um tempo significativo está sendo gasto na leitura de arquivos antigos de parquet, por exemplo:

16/11/27 14:06:15 INFO S3NativeFileSystem: Opening 's3: //myBucket/foo.parquet/id=123/day=2016-11-26/part-r-00003-b20752e9-5d70-43f5-b8b4 -50b5b4d0c7da.snappy.parquet 'para leitura

16/11/27 14:06:15 INFO S3NativeFileSystem: Fluxo para a chave 'foo.parquet / id = 123 / day = 2016-11-26 / part-r-00003-e80419de-7019-4859-bbe7-dcd392f6fcd3.snappy .parquet 'procura posição' 149195444 '

Parece que esta operação leva menos de 1 segundo por arquivo, mas a quantidade de arquivos aumenta com o tempo (cada anexo adiciona novos arquivos), o que me faz pensar que meu código não poderá ser dimensionado.

Alguma idéia de como evitar a leitura de arquivos antigos em parquet do s3, se eu precisar acrescentar novos dados?

Eu uso o EMR 4.8.2 e o DirectParquetOutputCommitter:

sc._jsc.hadoopConfiguration().set('spark.sql.parquet.output.committer.class', 'org.apache.spark.sql.parquet.DirectParquetOutputCommitter')

questionAnswers(1)

yourAnswerToTheQuestion