Spark Dataframe validiert Spaltennamen für Parkettschreibvorgänge (Scala)

Ich verarbeite Ereignisse mit Dataframes, die aus einem Stream von JSON-Ereignissen konvertiert wurden, der schließlich als Parkettformat ausgegeben wird.

Einige der JSON-Ereignisse enthalten jedoch Leerzeichen in den Schlüsseln, die ich protokollieren und aus dem Datenrahmen filtern / löschen möchte, bevor ich sie in Parquet konvertiere, da: {} () \ n \ t = als Sonderzeichen in Parkettschema (CatalystSchemaConverter) wie in @ aufgefüh [1] unter und sollte daher in den Spaltennamen nicht erlaubt sein.

Wie kann ich solche Überprüfungen in Dataframe für die Spaltennamen durchführen und ein solches Ereignis insgesamt löschen, ohne den Spark-Streaming-Job fehlerfrei auszuführen?

[1] Spark's CatalystSchemaConverter

def checkFieldName(name: String): Unit = {
    // ,;{}()\n\t= and space are special characters in Parquet schema
    checkConversionRequirement(
      !name.matches(".*[ ,;{}()\n\t=].*"),
      s"""Attribute name "$name" contains invalid character(s) among " ,;{}()\\n\\t=".
         |Please use alias to rename it.
       """.stripMargin.split("\n").mkString(" ").trim)
  }

Antworten auf die Frage(8)

Ihre Antwort auf die Frage