Esquema gerado dinamicamente no arquivo json para um arquivo de dados csv no scala
Precisa de ajuda para definir um esquema dinâmico com campos e tipos de dados do arquivo JSon de metadados de entrada para os dados em um arquivo csv no Databrick
Desejo definir um esquema que mapeie o nome do campo com o tipo de dados correspondente do JSON para inserir dados CS
Below is the JSon file:
{
"type": "record",
"name": "OTHTranDetail",
"fields": [
{
"name": "ParkNumb",
"type": {
"type": "int",
"connect.type": "int16"
},
"doc": "Park Number"
},
{
"name": "ParkId",
"type": {
"type": "int",
"connect.type": "int16"
},
"doc": "Parking Id"
}
],
"connect.name": "OTHTranDetail"
}
-- The above json file is named as Design.json
-----------------------------------------------------
--Below code is for data file and saving it to a .csv format-
import java.io._
import java.net._
import java.util._
import org.apache.spark.eventhubs._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming
val connectionString = ConnectionStringBuilder("connectionstring").setEventHubName("transactions_data").build
val customEventhubParameters = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
.setStartingPosition(EventPosition.fromOffset("-1"))
val ConsumerDF = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).option("checkpointLocation", "/tmp/checkpoint").load()
val OTHDF = ConsumerDF.select($"body" cast "string")
OTHDF.printSchema
//Writes the data intothe csv file for dynamica schema
OTHDF.writeStream.format("csv").option("truncate", false).option("inferschema",false).option("checkpointLocation", "/path/events1/_checkpoints/etl-from-json").start("/mnt/newfile1.csv")
Saída do arquivo de dados em formato csv como abaixo-
--- Reading the json file as below-
//json metadata file
import org.apache.spark.sql.DataFrameReader
import spark.implicits._
import org.apache.spark.rdd.RDD
val json1 = sc.wholeTextFiles("/FileStore/tables/Design.json").
map(tuple => tuple._2.replace("\n", "").trim)
val df = spark.read.json(json1.toDS)
df.printSchema()
json1.collect()
Eu tentei usar o abaixo (usando estouro de pilha em uma das postagens) -
val csv_df = spark.read.format("csv").schema(json1).load("newfile1.csv")
Mas gera um erro para schema ().
Alguém pode me sugerir qual seria a maneira correta de fazer isso?
Meu código para o arquivo de dados newfile1.csv e o arquivo json (esquema) funciona perfeitamente, só falta o último bit de código em que posso alocar dinamicamente os cabeçalhos e tipos de dados do arquivo json para o arquivo cs