Вам понадобятся классы потокового воспроизведения во время выполнения, не так ли? Но это не может быть плохой идеей, потому что, возможно, ваш дистрибутив Hadoop предоставит эти двоичные файлы, поэтому вам не нужно упаковывать их в свой собственный jar.

исал простой поток kafka с использованием Scala. Это хорошо работает на местном. Я взял толстую банку и отправил в кластер скала. Я получаю класс не найдена ошибка после отправки задания. если я достану банку с жиром, она будет зависеть от этой баночки.

почему я получаю класс не найдена ошибка? Как это решить?

Примечание: если я разверну (скопирую) толстый jar в папку Spark / jars вручную. Я не вижу никаких проблем. Но это не правильный подход

Я использую окно 7 и запускаю главный и рабочий узел на одной машине.

JOB Submit

spark-2.2\bin>spark-submit --class Welcome --master spark://169.254.208.125:7077 C:\Gnana\cass-conn-assembly-0.1.jar

Код

import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkConf
import com.datastax.spark.connector.streaming._


object Welcome {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("demo").setMaster("spark://169.254.208.125:7077");
    conf.set("spark.cassandra.connection.host", "192.168.1.2")
    val sc = new SparkContext(conf)

    sc.setLogLevel("WARN")
    val ssc = new StreamingContext(sc, Milliseconds(100))
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val topics = Array("test")
    val messages = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    val lines = messages.map(_.value)
    lines.map(li=>{val arr= li.split(",");(arr(0).toInt,arr(1),arr(2),arr(3))}).saveToCassandra("inventory", "emp",SomeColumns("emp_id","create_date","emp_city","emp_name"))
    println(" Spark is ready !!!!!! ");

    /*sys.ShutdownHookThread {
      println("Gracefully stopping Spark Streaming Application")
      ssc.stop(stopSparkContext = true, stopGracefully = true)
      println("Application stopped")
    }*/

    ssc.start();
    ssc.awaitTermination();
  }
  def sayHello(msg:String): Unit = {
    print("welcome to Sacala "+msg);
  }
}

build.sbt

organization := "com.demo"
name := "cass-conn"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-sql" % sparkVersion  % "provided",
  "org.apache.spark" %% "spark-hive" % sparkVersion  % "provided",
  "com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion  ,
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
  "org.apache.spark" %% "spark-streaming" %  "2.2.0"  % "provided",
)

mergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x => (mergeStrategy in assembly).value(x)
}

Ошибка 1:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
        at org.apache.spark.streaming.kafka010.ConsumerStrategies$.Subscribe(ConsumerStrategy.scala:256)
        at Welcome$.main(Welcome.scala:32)
        at Welcome.main(Welcome.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.sca
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.Consumer
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 12 more

Журнал сборки

[info] Done updating.
[info] Compiling 2 Scala sources to C:\Gnana\cass-conn\target\scala-2.11\classes ...
[info] Done compiling.
[info] Including: slf4j-api-1.7.21.jar
[info] Including: joda-time-2.3.jar
[info] Including: kafka_2.11-0.10.1.0.jar
[info] Including: scala-library-2.11.8.jar
[info] Including: jopt-simple-4.9.jar
[info] Including: metrics-core-2.2.0.jar
[info] Including: slf4j-log4j12-1.7.21.jar
[info] Including: log4j-1.2.17.jar
[info] Including: joda-convert-1.2.jar
[info] Including: zkclient-0.9.jar
[info] Including: scala-reflect-2.11.8.jar
[info] Including: zookeeper-3.4.8.jar
[info] Including: jline-0.9.94.jar
[info] Including: netty-3.7.0.Final.jar
[info] Including: lz4-1.3.0.jar
[info] Including: scala-parser-combinators_2.11-1.0.4.jar
[info] Including: snappy-java-1.1.2.6.jar
[info] Including: kafka-clients-0.10.1.0.jar
[info] Including: spark-streaming-kafka-0-10_2.11-2.2.0.jar
[info] Including: spark-tags_2.11-2.2.0.jar
[info] Including: unused-1.0.0.jar
[info] Including: netty-all-4.0.33.Final.jar
[info] Including: commons-beanutils-1.9.3.jar
[info] Including: jsr166e-1.1.0.jar
[info] Including: spark-cassandra-connector_2.11-2.0.7.jar
[info] Including: commons-collections-3.2.2.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[warn] Merging 'NOTICE' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE' with strategy 'rename'
[warn] Merging 'org\xerial\snappy\native\README' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\license' with strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE' with strategy 'rename'
[warn] Merging 'LICENSE' with strategy 'rename'
[warn] Merging 'META-INF\DEPENDENCIES' with strategy 'discard'
[warn] Merging 'META-INF\INDEX.LIST' with strategy 'discard'
[warn] Merging 'META-INF\MANIFEST.MF' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.xml' with strategy 'discard'
[warn] Merging 'org\apache\spark\unused\UnusedStubClass.class' with strategy 'first'
[warn] Strategy 'discard' was applied to 51 files
[warn] Strategy 'first' was applied to a file
[warn] Strategy 'rename' was applied to 9 files
[info] SHA-1: 85f8513511b46290883ab70f2525b04a8d3c33d7

Ответы на вопрос(1)

Ваш ответ на вопрос