¿Por qué los archivos de Spark Parquet para un agregado son más grandes que el original?
Estoy tratando de crear un archivo agregado para que los usuarios finales lo utilicen para evitar que procesen múltiples fuentes con archivos mucho más grandes. Para hacer eso, I: A) recorro todas las carpetas de origen, eliminando 12 campos que se solicitan con mayor frecuencia, girando los archivos de parquet en una nueva ubicación donde estos resultados se ubican conjuntamente. B) Intento volver a los archivos creados en el paso A y volver a agregarlos agrupándolos por los 12 campos para reducirlos a una fila de resumen para cada combinación única.
Lo que encuentro es que el paso A reduce la carga útil 5: 1 (aproximadamente 250 gigas se convierten en 48.5 gigas). Sin embargo, el paso B, en lugar de reducir aún más esto, aumenta en un 50% con respecto al paso A. Sin embargo, mis recuentos coinciden.
Esto está usando Spark 1.5.2
Mi código, modificado solo para reemplazar los nombres de campo con field1 ... field12 para hacerlo más legible, está debajo con los resultados que he notado.
Si bien no necesariamente espero otra reducción de 5: 1, no sé qué estoy haciendo incorrectamente para aumentar el lado de almacenamiento para menos filas con el mismo esquema. ¿Alguien capaz de ayudarme a entender lo que hice mal?
¡Gracias!
//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of 16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed
//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in 3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed
//The parquet schemas created (both tables match):
|-- field1: string (nullable = true) (10 characters)
|-- field2: string (nullable = true) (15 characters)
|-- field3: string (nullable = true) (50 characters max)
|-- field4: string (nullable = true) (10 characters)
|-- field5: string (nullable = true) (10 characters)
|-- field6: string (nullable = true) (10 characters)
|-- field7: string (nullable = true) (16 characters)
|-- field8: string (nullable = true) (10 characters)
|-- field9 string (nullable = true) (15 characters)
|-- field10: string (nullable = true)(20 characters)
|-- field11: string (nullable = true)(14 characters)
|-- field12: string (nullable = true)(14 characters)
|-- rCount: long (nullable = true)
|-- dt: string (nullable = true)