Вам понадобятся классы потокового воспроизведения во время выполнения, не так ли? Но это не может быть плохой идеей, потому что, возможно, ваш дистрибутив Hadoop предоставит эти двоичные файлы, поэтому вам не нужно упаковывать их в свой собственный jar.
исал простой поток kafka с использованием Scala. Это хорошо работает на местном. Я взял толстую банку и отправил в кластер скала. Я получаю класс не найдена ошибка после отправки задания. если я достану банку с жиром, она будет зависеть от этой баночки.
почему я получаю класс не найдена ошибка? Как это решить?
Примечание: если я разверну (скопирую) толстый jar в папку Spark / jars вручную. Я не вижу никаких проблем. Но это не правильный подход
Я использую окно 7 и запускаю главный и рабочий узел на одной машине.
JOB Submit
spark-2.2\bin>spark-submit --class Welcome --master spark://169.254.208.125:7077 C:\Gnana\cass-conn-assembly-0.1.jar
Код
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.SparkConf
import com.datastax.spark.connector.streaming._
object Welcome {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("demo").setMaster("spark://169.254.208.125:7077");
conf.set("spark.cassandra.connection.host", "192.168.1.2")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Milliseconds(100))
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.1.2:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("test")
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
val lines = messages.map(_.value)
lines.map(li=>{val arr= li.split(",");(arr(0).toInt,arr(1),arr(2),arr(3))}).saveToCassandra("inventory", "emp",SomeColumns("emp_id","create_date","emp_city","emp_name"))
println(" Spark is ready !!!!!! ");
/*sys.ShutdownHookThread {
println("Gracefully stopping Spark Streaming Application")
ssc.stop(stopSparkContext = true, stopGracefully = true)
println("Application stopped")
}*/
ssc.start();
ssc.awaitTermination();
}
def sayHello(msg:String): Unit = {
print("welcome to Sacala "+msg);
}
}
build.sbt
organization := "com.demo"
name := "cass-conn"
version := "0.1"
scalaVersion := "2.11.8"
val sparkVersion = "2.2.0"
val connectorVersion = "2.0.7"
val kafka_stream_version = "1.6.3"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.apache.spark" %% "spark-hive" % sparkVersion % "provided",
"com.datastax.spark" %% "spark-cassandra-connector" % connectorVersion ,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.2.0",
"org.apache.spark" %% "spark-streaming" % "2.2.0" % "provided",
)
mergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
case x => (mergeStrategy in assembly).value(x)
}
Ошибка 1:
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/kafka/clients/consumer/Consumer
at org.apache.spark.streaming.kafka010.ConsumerStrategies$.Subscribe(ConsumerStrategy.scala:256)
at Welcome$.main(Welcome.scala:32)
at Welcome.main(Welcome.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.sca
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.clients.consumer.Consumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 12 more
Журнал сборки
[info] Done updating.
[info] Compiling 2 Scala sources to C:\Gnana\cass-conn\target\scala-2.11\classes ...
[info] Done compiling.
[info] Including: slf4j-api-1.7.21.jar
[info] Including: joda-time-2.3.jar
[info] Including: kafka_2.11-0.10.1.0.jar
[info] Including: scala-library-2.11.8.jar
[info] Including: jopt-simple-4.9.jar
[info] Including: metrics-core-2.2.0.jar
[info] Including: slf4j-log4j12-1.7.21.jar
[info] Including: log4j-1.2.17.jar
[info] Including: joda-convert-1.2.jar
[info] Including: zkclient-0.9.jar
[info] Including: scala-reflect-2.11.8.jar
[info] Including: zookeeper-3.4.8.jar
[info] Including: jline-0.9.94.jar
[info] Including: netty-3.7.0.Final.jar
[info] Including: lz4-1.3.0.jar
[info] Including: scala-parser-combinators_2.11-1.0.4.jar
[info] Including: snappy-java-1.1.2.6.jar
[info] Including: kafka-clients-0.10.1.0.jar
[info] Including: spark-streaming-kafka-0-10_2.11-2.2.0.jar
[info] Including: spark-tags_2.11-2.2.0.jar
[info] Including: unused-1.0.0.jar
[info] Including: netty-all-4.0.33.Final.jar
[info] Including: commons-beanutils-1.9.3.jar
[info] Including: jsr166e-1.1.0.jar
[info] Including: spark-cassandra-connector_2.11-2.0.7.jar
[info] Including: commons-collections-3.2.2.jar
[info] Checking every *.class/*.jar file's SHA-1.
[info] Merging files...
[warn] Merging 'NOTICE' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE.txt' with strategy 'rename'
[warn] Merging 'META-INF\NOTICE' with strategy 'rename'
[warn] Merging 'org\xerial\snappy\native\README' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\license' with strategy 'rename'
[warn] Merging 'LICENSE.txt' with strategy 'rename'
[warn] Merging 'META-INF\LICENSE' with strategy 'rename'
[warn] Merging 'LICENSE' with strategy 'rename'
[warn] Merging 'META-INF\DEPENDENCIES' with strategy 'discard'
[warn] Merging 'META-INF\INDEX.LIST' with strategy 'discard'
[warn] Merging 'META-INF\MANIFEST.MF' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.datastax.cassandra\cassandra-driver-mapping\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-constants\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-ffi\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-posix\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.github.jnr\jnr-x86asm\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.google.guava\guava\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.twitter\jsr166e\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\com.yammer.metrics\metrics-core\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-beanutils\commons-beanutils\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\commons-collections\commons-collections\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty-all\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\io.netty\netty\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\jline\jline\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\joda-time\joda-time\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\log4j\log4j\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\net.sf.jopt-simple\jopt-simple\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-streaming-kafka-0-10_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.apache.spark\spark-tags_2.11\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.joda\joda-convert\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-api\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.slf4j\slf4j-log4j12\pom.xml' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.properties' with strategy 'discard'
[warn] Merging 'META-INF\maven\org.spark-project.spark\unused\pom.xml' with strategy 'discard'
[warn] Merging 'org\apache\spark\unused\UnusedStubClass.class' with strategy 'first'
[warn] Strategy 'discard' was applied to 51 files
[warn] Strategy 'first' was applied to a file
[warn] Strategy 'rename' was applied to 9 files
[info] SHA-1: 85f8513511b46290883ab70f2525b04a8d3c33d7