но я использую иск 2.3.0 .. можете ли вы предоставить, как использовать foreach в 2.3.0
запрос на интеграцию искровой структурированной потоковой передачи с таблицей HIVE.
Я попытался сделать несколько примеров потоковой структурированной искры.
вот мой пример
val spark =SparkSession.builder().appName("StatsAnalyzer")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "hdfs://pp/apps/hive/warehouse/ab.db")
.getOrCreate()
// Register the dataframe as a Hive table
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark.readStream.option("sep", ",").schema(userSchema).csv("file:///home/su/testdelta")
csvDF.createOrReplaceTempView("updates")
val query= spark.sql("insert into table_abcd select * from updates")
query.writeStream.start()
Как вы можете видеть на последнем шаге при записи фрейма данных в местоположение hdfs, данные не вставляются в захватывающий каталог (мой существующий каталог содержит некоторые старые данные, разделенные на «age»).
я осознаю
spark.sql.AnalysisException: запросы с потоковым источником должны выполняться с помощью writeStream start ()
Можете ли вы помочь, почему я не могу вставить данные в существующий каталог в папке hdfs? или есть какой-то другой способ, которым я могу сделать операцию "вставить в" на таблице улья?
Ищем решение