Мой код для файла данных newfile1.csv и json file (схема) работает отлично, я просто пропускаю последний бит кода, где я могу динамически распределять заголовки и типы данных из файла json в файл csv.
ваша помощь в определении динамической схемы с полями и типами данных из файла JSon входных метаданных для данных в файле csv в Databricks.
Я хочу определить схему, которая отображает имя поля с соответствующим типом данных из JSON для ввода данных CSV.
Below is the JSon file:
{
"type": "record",
"name": "OTHTranDetail",
"fields": [
{
"name": "ParkNumb",
"type": {
"type": "int",
"connect.type": "int16"
},
"doc": "Park Number"
},
{
"name": "ParkId",
"type": {
"type": "int",
"connect.type": "int16"
},
"doc": "Parking Id"
}
],
"connect.name": "OTHTranDetail"
}
-- The above json file is named as Design.json
-----------------------------------------------------
--Below code is for data file and saving it to a .csv format-
import java.io._
import java.net._
import java.util._
import org.apache.spark.eventhubs._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming
val connectionString = ConnectionStringBuilder("connectionstring").setEventHubName("transactions_data").build
val customEventhubParameters = EventHubsConf(connectionString)
.setStartingPosition(EventPosition.fromEndOfStream)
.setStartingPosition(EventPosition.fromOffset("-1"))
val ConsumerDF = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).option("checkpointLocation", "/tmp/checkpoint").load()
val OTHDF = ConsumerDF.select($"body" cast "string")
OTHDF.printSchema
//Writes the data intothe csv file for dynamica schema
OTHDF.writeStream.format("csv").option("truncate", false).option("inferschema",false).option("checkpointLocation", "/path/events1/_checkpoints/etl-from-json").start("/mnt/newfile1.csv")
Вывод файла данных в формате CSV, как показано ниже:
--- Reading the json file as below-
//json metadata file
import org.apache.spark.sql.DataFrameReader
import spark.implicits._
import org.apache.spark.rdd.RDD
val json1 = sc.wholeTextFiles("/FileStore/tables/Design.json").
map(tuple => tuple._2.replace("\n", "").trim)
val df = spark.read.json(json1.toDS)
df.printSchema()
json1.collect()
Я попытался использовать приведенный ниже (используя переполнение стека в одном из сообщений) -
val csv_df = spark.read.format("csv").schema(json1).load("newfile1.csv")
Но он выдает ошибку для schema ().
Может кто-нибудь предложить мне, что было бы правильным способом сделать это?
Мой код для файла данных newfile1.csv и json file (схема) работает отлично, я просто пропускаю последний бит кода, где я могу динамически распределять заголовки и типы данных из файла json в файл csv.