El trabajo de transmisión con estado chispeante se cuelga en el punto de control a S3 después de un largo tiempo de actividad

Recientemente he estado haciendo pruebas de estrés con nuestra aplicación Spark Streaming. La prueba de esfuerzo consume alrededor de 20,000 mensajes / seg con tamaños de mensaje que varían entre 200bytes - 1K en Kafka, donde Spark Streaming está leyendo lotes cada 4 segundos.

Nuestro clúster Spark se ejecuta en la versión 1.6.1 con el administrador de clúster independiente y estamos utilizando Scala 2.10.6 para nuestro código.

Después de aproximadamente 15-20 horas de ejecución, uno de los ejecutores que, h está iniciando un punto de control (hecho en un intervalo de 40 segundos) está atascado con el siguiente seguimiento de la pila y nunca se completa:

java.net.SocketInputStream.socketRead0 (método nativo) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketInputSputam. : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java:532 ) sun.security.ssl. .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http. .java: 401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache .http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.implRec.ctient.Detalles.client. (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractHttpClient.java:863) .http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3t.service.impl.rest.httpclient.Rest (. : 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageSe rvice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails (StorageService.ja.:jets. .Interiorización. .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io.retry .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (Fuente desconocida) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus (NativeS3.org) .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd.ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $. .ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.unTask.schek.spark.skr.spark.sk.sp : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Mientras está atascado, el generador de chispas se niega a continuar procesando lotes entrantes y crea una enorme acumulación de lotes en cola que no se pueden procesar hasta liberar la tarea que está "atascada".

Más aún, mirando el volcado de subprocesos del controlador enstreaming-job-executor-0 muestra claramente que está esperando que se complete esta tarea:

java.lang.Object.wait (Método nativo) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark.scheduler .DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.apache.spark.SparkContext .runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd.ReliableRDDCheckpointData.check.check. spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd. RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark.r dd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

¿Alguien ha experimentado tal problema?

Respuestas a la pregunta(1)

Su respuesta a la pregunta