google.com/...
лаем потоковую передачу данных Кафки, которые собираются из MySQL. Теперь, когда вся аналитика сделана, я хочу сохранить свои данные прямо в Hbase. Я пролистал документ с потоковой структурой искры, но не смог найти ни одного приемника с Hbase. Код, который я использовал для чтения данных из Кафки, приведен ниже.
val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
val uschema = StructType(Seq(
StructField("MeterNumber", StringType, true),
StructField("Utility", StringType, true),
StructField("VendorServiceNumber", StringType, true),
StructField("VendorName", StringType, true),
StructField("SiteNumber", StringType, true),
StructField("SiteName", StringType, true),
StructField("Location", StringType, true),
StructField("timestamp", LongType, true),
StructField("power", DoubleType, true)
))
val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")
Теперь, наконец, я хочу сохранить фрейм данных DF_Hbase в hbase.