a función .count () de @ Spark es diferente al contenido del marco de datos cuando se filtra en el campo de registro corrupto
Tengo un trabajo de Spark, escrito en Python, que está obteniendo un comportamiento extraño al verificar los errores en sus datos. A continuación se muestra una versión simplificada:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, StructField, DoubleType
from pyspark.sql.functions import col, lit
spark = SparkSession.builder.master("local[3]").appName("pyspark-unittest").getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
SCHEMA = StructType([
StructField("headerDouble", DoubleType(), False),
StructField("ErrorField", StringType(), False)
])
dataframe = (
spark.read
.option("header", "true")
.option("mode", "PERMISSIVE")
.option("columnNameOfCorruptRecord", "ErrorField")
.schema(SCHEMA).csv("./x.csv")
)
total_row_count = dataframe.count()
print("total_row_count = " + str(total_row_count))
errors = dataframe.filter(col("ErrorField").isNotNull())
errors.show()
error_count = errors.count()
print("errors count = " + str(error_count))
El csv que está leyendo es simplemente:
headerDouble
wrong
La salida relevante de esto es
total_row_count = 1
+------------+----------+
|headerDouble|ErrorField|
+------------+----------+
| null| wrong|
+------------+----------+
errors count = 0
Ahora, ¿cómo puede suceder esto? Si el marco de datos tiene un registro, ¿cómo se cuenta como 0? ¿Es esto un error en la infraestructura de Spark o me falta algo?
EDIT: Parece que esto podría ser un error conocido en Spark 2.2 que se ha solucionado en Spark 2.3 -https: //issues.apache.org/jira/browse/SPARK-2161