Подготовить пакетный оператор для хранения всех rdd в mysql, сгенерированных из потокового потока

Я пытаюсь вставить пакетные RDD, сгенерированные из Dstream с использованием искровой потоковой передачи в MySQL. Следующий код работает нормально, но проблема в том, что я создаю одно соединение для хранения каждого кортежа. Итак, чтобы избежать этого, я создал соединение вне foreachRDD, но это дало мне следующую ошибку:

Код:

realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
                    x.foreachPartition {
                        it =>
                            val conn = DriverManager.getConnection("jdbc:mysql://IP:Port/DbName,UserName,Password)                  
                            val insertData = conn.prepareStatement("INSERT INTO MySqlTable (col1, col2, col3, col4) VALUES (?,?,?,?) 

                            for (tuple <- it) {
                                insertData.setLong(1, tuple._1._3)
                                insertData.setString(2, tuple._1._1)
                                insertData.setString(3, tuple._1._2)
                                insertData.setLong(4, tuple._2)

                                insertData.executeUpdate()                              
                            }   
                            conn.close()
                    }
                }

Ошибка :

com.typesafe.config.ConfigException$BugOrBroken: com.typesafe.config.impl.SerializedConfigValue should not exist outside of serialization
    at com.typesafe.config.impl.SerializedConfigValue.shouldNotBeUsed(SerializedConfigValue.java:471)
    at com.typesafe.config.impl.SerializedConfigValue.unwrapped(SerializedConfigValue.java:482)
    at com.typesafe.config.impl.AbstractConfigValue.hashCode(AbstractConfigValue.java:272)
    at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391)
    at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
    at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
    at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
    at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
    at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:87)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:159)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
    at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
    at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:75)
    at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:74)
    at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDКод:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDКод:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/05/26 19:08:50 INFO JobScheduler: Finished job streaming job 1464269930000 ms.0 from job set of time 1464269930000 ms
16/05/26 19:08:50 INFO JobScheduler: Total delay: 0.332 s for time 1464269930000 ms (execution: 0.305 s)
16/05/26 19:08:50 ERROR JobScheduler: Error running job streaming job 1464269930000 ms.0
org.apache.spark.SparkException: Task not serializable

Более того, чтобы оптимизировать код, я хотел подготовить пакетный оператор, который можно было бы сохранить один раз вместо повторения каждого кортежа снова и снова. Но опять же это дало мне следующую ошибку. Итак, может ли кто-нибудь сказать мне, как подготовить пакетный оператор, в то же время создавая соединение только один раз.

Кроме того, как дать две строки соединения узлов mysql в случае, если один отключается, таким образом, это должно переключиться на строку соединения аварийного переключения.

Ошибка:

ERROR JobScheduler: Error running job streaming job 1464269590000 ms.0
org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:919)
    at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:918)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
    at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:78)
    at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:77)
    at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDКод:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDКод:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfunКод:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
    at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.mysql.jdbc.JDBC4PreparedStatement

Ответы на вопрос(0)

Ваш ответ на вопрос