Spark Stateful Streaming-Job bleibt nach langer Betriebszeit beim Checkpointing auf S3 hängen

Ich habe kürzlich unsere Spark-Streaming-App einem Stresstest unterzogen. Der Stresstest erfasst ungefähr 20.000 Nachrichten pro Sekunde mit Nachrichtengrößen zwischen 200 Byte und 1 KB in Kafka, wobei Spark Streaming alle 4 Sekunden Batches liest.

Unser Spark-Cluster läuft unter Version 1.6.1 mit Standalone-Cluster-Manager, und wir verwenden Scala 2.10.6 für unseren Code.

Nach etwa 15 bis 20 Stunden wird einer der Executoren, der einen Checkpoint initiiert (im Abstand von 40 Sekunden), mit dem folgenden Stack-Trace blockiert und wird nie abgeschlossen:

java.net.SocketInputStream.socketRead0 (native Methode) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketInputStream. java: 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java: 532) sun.security.ssl.SSLSocketImpl.readRecord (SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake (SSLSocketImpl.java:1375) sun.security.ssl.SSLSocketImpl.startHandshake (SSL:SocketImpl3pl sun.security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket ( SSLSocketFactory.java:401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apach e.http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequest. tryConnect (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (org.Http. apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3t.service.impl.rest.htp. RestStorageService.performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest (RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient. java: 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageS ervice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails (StorageService.java:1120) org.java. StorageService.getObjectDetails (StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32 .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io.retry .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (unbekannte Quelle) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus (NativeS3FileS3FileSystem .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd.ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRD.spark.RD .ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run ( : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Während der Spark-Treiber feststeckt, weigert er sich, die Verarbeitung eingehender Stapel fortzusetzen, und erstellt einen großen Rückstand an in der Warteschlange befindlichen Stapeln, die erst verarbeitet werden können, wenn die "feststeckende" Task freigegeben wird.

Weitere Informationen finden Sie im Treiber-Thread-Dump unterstreaming-job-executor-0 zeigt deutlich, dass es auf den Abschluss dieser Aufgabe wartet:

java.lang.Object.wait (native Methode) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark. scheduler.DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.apache.spark. SparkContext.runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd.ReliableRDDCheckpointData.DoCheckpoint (ReliableRDDC) .spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.anwenden $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd .RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark. rdd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ wende $ mcV $ sp $ an. 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

Hat jemand ein solches Problem erlebt?

Antworten auf die Frage(2)

Ihre Antwort auf die Frage