Por que os arquivos Spark Parquet para um agregado são maiores que o original?
Estou tentando criar um arquivo agregado para os usuários finais utilizarem para evitar que eles processem várias fontes com arquivos muito maiores. Para fazer isso, I: A) repete todas as pastas de origem, removendo os 12 campos mais solicitados, girando os arquivos de parquet em um novo local onde esses resultados estão localizados. B) Eu tento voltar pelos arquivos criados na etapa A e agregá-los novamente agrupando os 12 campos para reduzi-lo a uma linha de resumo para cada combinação exclusiva.
O que eu estou descobrindo é que a etapa A reduz a carga útil 5: 1 (aproximadamente 250 GB se tornam 48,5 GB). A Etapa B, no entanto, em vez de reduzir ainda mais isso, aumente em 50% em relação à etapa A. No entanto, minhas contagens correspondem.
Isso está usando o Spark 1.5.2
Meu código, modificado apenas para substituir os nomes dos campos por campo1 ... campo12 para torná-lo mais legível, está abaixo com os resultados que eu observei.
Embora eu não espere necessariamente outra redução de 5: 1, não sei o que estou fazendo incorretamente para aumentar o lado do armazenamento para menos linhas com o mesmo esquema. Alguém capaz de me ajudar a entender o que fiz de errado?
Obrigado!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)