Prepare a instrução em lote para armazenar todo o rdd no mysql gerado a partir do spark-streaming
Estou tentando inserir os RDDs de lote gerados a partir do Dstream usando spark-streaming no mysql. O código a seguir funciona bem, mas o problema é que estou criando uma conexão para armazenar cada tupla. Portanto, para evitar que eu criei a conexão fora do foreachRDD, mas ele me deu o seguinte erro:
Código:
realTimeAgg.foreachRDD{ x => if (x.toLocalIterator.nonEmpty) {
x.foreachPartition {
it =>
val conn = DriverManager.getConnection("jdbc:mysql://IP:Port/DbName,UserName,Password)
val insertData = conn.prepareStatement("INSERT INTO MySqlTable (col1, col2, col3, col4) VALUES (?,?,?,?)
for (tuple <- it) {
insertData.setLong(1, tuple._1._3)
insertData.setString(2, tuple._1._1)
insertData.setString(3, tuple._1._2)
insertData.setLong(4, tuple._2)
insertData.executeUpdate()
}
conn.close()
}
}
Erro:
com.typesafe.config.ConfigException$BugOrBroken: com.typesafe.config.impl.SerializedConfigValue should not exist outside of serialization
at com.typesafe.config.impl.SerializedConfigValue.shouldNotBeUsed(SerializedConfigValue.java:471)
at com.typesafe.config.impl.SerializedConfigValue.unwrapped(SerializedConfigValue.java:482)
at com.typesafe.config.impl.AbstractConfigValue.hashCode(AbstractConfigValue.java:272)
at scala.collection.mutable.FlatHashTable$HashUtils$class.elemHashCode(FlatHashTable.scala:391)
at scala.collection.mutable.HashSet.elemHashCode(HashSet.scala:41)
at scala.collection.mutable.FlatHashTable$class.findEntryImpl(FlatHashTable.scala:123)
at scala.collection.mutable.FlatHashTable$class.containsEntry(FlatHashTable.scala:119)
at scala.collection.mutable.HashSet.containsEntry(HashSet.scala:41)
at scala.collection.mutable.HashSet.contains(HashSet.scala:58)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:87)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:159)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visitSerializable(SerializationDebugger.scala:206)
at org.apache.spark.serializer.SerializationDebugger$SerializationDebugger.visit(SerializationDebugger.scala:108)
at org.apache.spark.serializer.SerializationDebugger$.find(SerializationDebugger.scala:67)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:75)
at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:74)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDCódigo:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDCódigo:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
16/05/26 19:08:50 INFO JobScheduler: Finished job streaming job 1464269930000 ms.0 from job set of time 1464269930000 ms
16/05/26 19:08:50 INFO JobScheduler: Total delay: 0.332 s for time 1464269930000 ms (execution: 0.305 s)
16/05/26 19:08:50 ERROR JobScheduler: Error running job streaming job 1464269930000 ms.0
org.apache.spark.SparkException: Task not serializable
Além disso, para otimizar o código, eu queria preparar uma instrução em lote que pudesse ser armazenada uma vez em vez de repetir cada tupla repetidamente. Mas, novamente, me deu o seguinte erro. Portanto, alguém pode me dizer como preparar a instrução de lote e, ao mesmo tempo, criar conexão apenas uma vez.
Além disso, como atribuir a cadeia de conexão de dois nós mysql no caso de uma ficar inativa, deve mudar para a cadeia de conexão de failover.
Erro:
ERROR JobScheduler: Error running job streaming job 1464269590000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:919)
at org.apache.spark.rdd.RDD$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:78)
at Streamingjob$anonfun$main$1.apply(StreamingJob.scala:77)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDCódigo:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$anonfun$foreachRDDCódigo:$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfunCódigo:$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$anonfun$run$1.apply(JobScheduler.scala:224)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.mysql.jdbc.JDBC4PreparedStatement