но я использую иск 2.3.0 .. можете ли вы предоставить, как использовать foreach в 2.3.0

запрос на интеграцию искровой структурированной потоковой передачи с таблицей HIVE.

Я попытался сделать несколько примеров потоковой структурированной искры.

вот мой пример

 val spark =SparkSession.builder().appName("StatsAnalyzer")
     .enableHiveSupport()
     .config("hive.exec.dynamic.partition", "true")
     .config("hive.exec.dynamic.partition.mode", "nonstrict")
     .config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
     .getOrCreate()

 // Register the dataframe as a Hive table

 val userSchema = new StructType().add("name", "string").add("age", "integer")
 val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta") 
 csvDF.createOrReplaceTempView("updates")
 val query= spark.sql("insert into table_abcd select * from updates")

 query.writeStream.start()

Как вы можете видеть на последнем шаге при записи фрейма данных в местоположение hdfs, данные не вставляются в захватывающий каталог (мой существующий каталог содержит некоторые старые данные, разделенные на «age»).

я осознаю

spark.sql.AnalysisException: запросы с потоковым источником должны выполняться с помощью writeStream start ()

Можете ли вы помочь, почему я не могу вставить данные в существующий каталог в папке hdfs? или есть какой-то другой способ, которым я могу сделать операцию "вставить в" на таблице улья?

Ищем решение

Ответы на вопрос(2)

Ваш ответ на вопрос