Wie man Byte [] [] mit Kryo-Serialisierung für Funken registriert

Ich versuche, die Kryo-Serialisierung für Funken vollständig zu nutzen. Rahme

.set("spark.kryo.registrationRequired", "true")

Dies teilt mir mit, welche Klassen registriert werden müssen. Ich habe ungefähr 40 Klassen angemeldet, einige meiner Klassen und einige der Spark-Klassen. Ich folgte Kryo-Serialisierung in Spark (Scala) anfordern post um sich zu registrieren / alles einzurichten.

Ich stoße jetzt auf Folgendes und kann nicht herausfinden, wie ich es in Scala registrieren soll. Hat jemand dieses Problem gelöst?

Ich habe verschiedene Kombinationen ausprobiert, darunter:

kryo.register(classOf[Array[Array[Byte]]])
conf.set("classesToRegister", "classOf[Array[Array[Byte]]]")
conf.registerKryoClasses(Array(classOf[Array[Array[Byte]]]))

Ich habe einen unbeantworteten Beitrag gefundenhttps: //mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%3CCAHCfvsSyUpx78ZFS_A9ycxvtO1=Jp7DfCCAeJKHyHZ1sugqHEQ@mail.gmail.com%3 unter Angabe des gleichen Problems.

java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.lang.IllegalArgumentException: Class is not registered: byte[][]
Note: To register this class use: kryo.register(byte[][].class);
Serialization trace:
buffers (org.apache.spark.sql.columnar.CachedBatch)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:158)
at org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153)
at org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1190)
at org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1199)
at org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:191)
at org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:480)
at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302)
at org.apache.spark.network.netty.NettyBlockRpcServer$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.netty.NettyBlockRpcServer$anonfun$2.apply(NettyBlockRpcServer.scala:57)
at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
at org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)
at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)
at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)
at org.apache.spark.network.,server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

Antworten auf die Frage(2)

Ihre Antwort auf die Frage