Мой код для файла данных newfile1.csv и json file (схема) работает отлично, я просто пропускаю последний бит кода, где я могу динамически распределять заголовки и типы данных из файла json в файл csv.

ваша помощь в определении динамической схемы с полями и типами данных из файла JSon входных метаданных для данных в файле csv в Databricks.

Я хочу определить схему, которая отображает имя поля с соответствующим типом данных из JSON для ввода данных CSV.

Below is the JSon file:

{
  "type": "record",
  "name": "OTHTranDetail",
  "fields": [
    {
      "name": "ParkNumb",
      "type": {
        "type": "int",
        "connect.type": "int16"
      },
      "doc": "Park Number"
    },
    {
      "name": "ParkId",
      "type": {
        "type": "int",
        "connect.type": "int16"
      },
      "doc": "Parking Id"
    }
  ],
  "connect.name": "OTHTranDetail"
}


-- The above json file is named as Design.json  
-----------------------------------------------------


--Below code is for data file and saving it to a .csv format-


import java.io._
import java.net._
import java.util._
import org.apache.spark.eventhubs._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.execution.streaming

val connectionString = ConnectionStringBuilder("connectionstring").setEventHubName("transactions_data").build

val customEventhubParameters = EventHubsConf(connectionString)
                              .setStartingPosition(EventPosition.fromEndOfStream)
                              .setStartingPosition(EventPosition.fromOffset("-1"))

val ConsumerDF = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).option("checkpointLocation", "/tmp/checkpoint").load()

val OTHDF = ConsumerDF.select($"body" cast "string") 
OTHDF.printSchema

//Writes the data intothe csv file for dynamica schema
OTHDF.writeStream.format("csv").option("truncate", false).option("inferschema",false).option("checkpointLocation", "/path/events1/_checkpoints/etl-from-json").start("/mnt/newfile1.csv")

Вывод файла данных в формате CSV, как показано ниже:

--- Reading the json file as below-

//json metadata file

import org.apache.spark.sql.DataFrameReader
import spark.implicits._
import org.apache.spark.rdd.RDD
val json1 = sc.wholeTextFiles("/FileStore/tables/Design.json").
  map(tuple => tuple._2.replace("\n", "").trim)
val df = spark.read.json(json1.toDS) 
df.printSchema()
json1.collect()

Я попытался использовать приведенный ниже (используя переполнение стека в одном из сообщений) -

val csv_df = spark.read.format("csv").schema(json1).load("newfile1.csv")

Но он выдает ошибку для schema ().
Может кто-нибудь предложить мне, что было бы правильным способом сделать это?

Мой код для файла данных newfile1.csv и json file (схема) работает отлично, я просто пропускаю последний бит кода, где я могу динамически распределять заголовки и типы данных из файла json в файл csv.

Ответы на вопрос(0)

Ваш ответ на вопрос