google.com/...

лаем потоковую передачу данных Кафки, которые собираются из MySQL. Теперь, когда вся аналитика сделана, я хочу сохранить свои данные прямо в Hbase. Я пролистал документ с потоковой структурой искры, но не смог найти ни одного приемника с Hbase. Код, который я использовал для чтения данных из Кафки, приведен ниже.

 val records = spark.readStream.format("kafka").option("subscribe", "kaapociot").option("kafka.bootstrap.servers", "XX.XX.XX.XX:6667").option("startingOffsets", "earliest").load
 val jsonschema = StructType(Seq(StructField("header", StringType, true),StructField("event", StringType, true)))
 val uschema = StructType(Seq(
             StructField("MeterNumber", StringType, true),
             StructField("Utility", StringType, true),
             StructField("VendorServiceNumber", StringType, true),
             StructField("VendorName", StringType, true),
             StructField("SiteNumber",  StringType, true),
             StructField("SiteName", StringType, true),
             StructField("Location", StringType, true),
             StructField("timestamp", LongType, true),
             StructField("power", DoubleType, true)
             ))
 val DF_Hbase = records.selectExpr("cast (value as string) as Json").select(from_json($"json",schema=jsonschema).as("data")).select("data.event").select(from_json($"event", uschema).as("mykafkadata")).select("mykafkadata.*")

Теперь, наконец, я хочу сохранить фрейм данных DF_Hbase в hbase.

 Vignesh I07 нояб. 2017 г., 08:22

Ответы на вопрос(3)

1- добавьте эти библиотеки в ваш проект:

      "org..hbase" % "hbase-client" % "2.0.1"
      "org..hbase" % "hbase-common" % "2.0.1"

2 - добавьте эту черту в ваш код:

   import java.util.concurrent.ExecutorService
   import org..hadoop.hbase.client.{Connection, ConnectionFactory, Put, Table}
   import org..hadoop.hbase.security.User
   import org..hadoop.hbase.{HBaseConfiguration, TableName}
   import org..spark.sql.ForeachWriter

   trait HBaseForeachWriter[RECORD] extends ForeachWriter[RECORD] {

     val tableName: String
     val hbaseConfResources: Seq[String]

     def pool: Option[ExecutorService] = None

     def user: Option[User] = None

     private var hTable: Table = _
     private var connection: Connection = _


     override def open(partitionId: Long, version: Long): Boolean = {
       connection = createConnection()
       hTable = getHTable(connection)
       true
     }

     def createConnection(): Connection = {
       val hbaseConfig = HBaseConfiguration.create()
       hbaseConfResources.foreach(hbaseConfig.addResource)
       ConnectionFactory.createConnection(hbaseConfig, pool.orNull,                      user.orNull)

     }

     def getHTable(connection: Connection): Table = {
       connection.getTable(TableName.valueOf(tableName))
     }

     override def process(record: RECORD): Unit = {
       val put = toPut(record)
       hTable.put(put)
     }

     override def close(errorOrNull: Throwable): Unit = {
       hTable.close()
       connection.close()
     }

     def toPut(record: RECORD): Put

   }

3- используйте его для своей логики:

    val ds = .... //anyDataset[WhatEverYourDataType]

    val query = ds.writeStream
           .foreach(new HBaseForeachWriter[WhatEverYourDataType] {
                            override val tableName: String = "hbase-table-name"
                            //your cluster files, i assume here it is in resources  
                            override val hbaseConfResources: Seq[String] = Seq("core-site.xml", "hbase-site.xml") 

                            override def toPut(record: WhatEverYourDataType): Put = {
                              val key = .....
                              val columnFamaliyName : String = ....
                              val columnName : String = ....
                              val columnValue = ....

                              val p = new Put(Bytes.toBytes(key))
                              //Add columns ... 
                   p.addColumn(Bytes.toBytes(columnFamaliyName),
                               Bytes.toBytes(columnName), 
                               Bytes.toBytes(columnValue))

                              p
                            }

                          }
           ).start()

         query.awaitTermination()
 linehrr25 окт. 2018 г., 18:40
это правильный подход, можете ли вы также добавить более сложную логику, чтобы сделать этот приемник идемпотентным для достижения семантики точной однократной доставки.

поступающие с Kafka? Или просто качать его на HBase? Возможность рассмотреть использованиеКафка Коннект, Это дает вам подход на основе файла конфигурации для интеграции Kafka с другими системами, включая HBase.

 Chris Snow07 окт. 2018 г., 09:35
Я только что проверил и не смог найти разъем hbase
 Robin Moffatt07 окт. 2018 г., 19:17
есть как минимум два (хотя я их не пробовал):google.com/...

https://github.com/hortonworks-spark/shc/issues/205

package HBase
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.datasources.hbase._

class HBaseSink(options: Map[String, String]) extends Sink with Logging {
  // String with HBaseTableCatalog.tableCatalog
  private val hBaseCatalog = options.get("hbasecat").map(_.toString).getOrElse("")

  override def addBatch(batchId: Long, data: DataFrame): Unit = synchronized {   
    val df = data.sparkSession.createDataFrame(data.rdd, data.schema)
    df.write
      .options(Map(HBaseTableCatalog.tableCatalog->hBaseCatalog,
        HBaseTableCatalog.newTable -> "5"))
      .format("org.apache.spark.sql.execution.datasources.hbase").save()
  }
}

class HBaseSinkProvider extends StreamSinkProvider with DataSourceRegister {
  def createSink(
                  sqlContext: SQLContext,
                  parameters: Map[String, String],
                  partitionColumns: Seq[String],
                  outputMode: OutputMode): Sink = {
    new HBaseSink(parameters)
  }

  def shortName(): String = "hbase"
}

Я добавил файл с именем HBaseSinkProvider.scala вshc/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase и построил, пример работает отлично

Вот пример того, как использовать (scala):

inputDF.
   writeStream.
   queryName("hbase writer").
   format("HBase.HBaseSinkProvider").
   option("checkpointLocation", checkPointProdPath).
   option("hbasecat", catalog).
   outputMode(OutputMode.Update()).
   trigger(Trigger.ProcessingTime(30.seconds)).
   start

И пример того, как я использую его в Python:

inputDF \
    .writeStream \
    .outputMode("append") \
    .format('HBase.HBaseSinkProvider') \
    .option('hbasecat', catalog_kafka) \
    .option("checkpointLocation", '/tmp/checkpoint') \
    .start()
 HAVB23 мар. 2018 г., 14:22
Пожалуйста, рассмотрите описание решения (а не просто ссылку на другой сайт). «Автономные» ответы предпочтительнее, так как они не полагаются на внешние ресурсы, которые могут измениться или исчезнуть в будущем, а также позволяют пользователям сразу же получать доступ к информации, которую они ищут, вместо того, чтобы открывать другую вкладку браузера.

Ваш ответ на вопрос