Warum sind Spark Parkett-Dateien für ein Aggregat größer als das Original?

Ich versuche, eine Gesamtdatei für Endbenutzer zu erstellen, um zu vermeiden, dass diese mehrere Quellen mit viel größeren Dateien verarbeiten. Dazu gehe ich folgendermaßen vor: A) Durchsuche alle Quellordner, entferne 12 am häufigsten angeforderte Felder und spinne Parkettdateien an einem neuen Ort aus, an dem sich diese Ergebnisse zusammen befinden. B) Ich versuche, die in Schritt A erstellten Dateien erneut durchzugehen und sie durch Gruppieren nach den 12 Feldern zu aggregieren, um sie für jede eindeutige Kombination zu einer Zusammenfassungszeile zu reduzieren.

Was ich finde, ist, dass Schritt A die Nutzlast 5: 1 reduziert (ungefähr 250 Gigs werden zu 48,5 Gigs). In Schritt B wird dies jedoch nicht weiter reduziert, sondern gegenüber Schritt A um 50% erhöht. Meine Zählungen stimmen jedoch überein.

Dies verwendet Spark 1.5.2
Mein Code, der nur geändert wurde, um die Feldnamen durch field1 ... field12 zu ersetzen, um die Lesbarkeit zu verbessern, enthält die unten angegebenen Ergebnisse.

Während ich nicht unbedingt eine weitere Reduzierung von 5: 1 erwarte, weiß ich nicht, was ich falsch mache, um die Speicherseite für weniger Zeilen mit demselben Schema zu erhöhen. Kann mir jemand helfen zu verstehen, was ich falsch gemacht habe?

Vielen Dank

//for each eventName found in separate source folders, do the following:
//spit out one row with key fields from the original dataset for quicker availability to clients 
//results in a 5:1 reduction in size
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, cast(1 as bigint) as rCount from table"
sqlContext.sql(sqlCommand).coalesce(20).write.parquet("<aws folder>" + dt + "/" + eventName + "/")
//results in over 700 files with a total of  16,969,050,506 rows consuming 48.65 gigs of storage space in S3, compressed 

//after all events are processed, aggregate the results
val sqlStatement = "Select field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12, sum(rCount) as rCount from results group by field1, field2, field3, field4, field5, field6, field7, field8, field9, field10, field11, field12"
//Use a wildcard to search all sub-folders created above
sqlContext.read.parquet("<aws folder>" + dt + "/*/").registerTempTable("results")
sqlContext.sql(sqlStatement).coalesce(20).saveAsParquetFile("<a new aws folder>" + dt + "/")
//This results in  3,295,206,761 rows with an aggregate value of 16,969,050,506 for rCount but consumes 79.32 gigs of storage space in S3, compressed

//The parquet schemas created (both tables match):
 |-- field1: string (nullable = true) (10 characters)
 |-- field2: string (nullable = true) (15 characters)
 |-- field3: string (nullable = true) (50 characters max)
 |-- field4: string (nullable = true) (10 characters)
 |-- field5: string (nullable = true) (10 characters)
 |-- field6: string (nullable = true) (10 characters)
 |-- field7: string (nullable = true) (16 characters)
 |-- field8: string (nullable = true) (10 characters)
 |-- field9 string (nullable = true)  (15 characters)
 |-- field10: string (nullable = true)(20 characters)
 |-- field11: string (nullable = true)(14 characters)
 |-- field12: string (nullable = true)(14 characters)
 |-- rCount: long (nullable = true)   
 |-- dt: string (nullable = true)

Antworten auf die Frage(2)

Ihre Antwort auf die Frage