O trabalho de streaming estável do Spark trava no ponto de verificação para o S3 após um longo tempo de atividade

Estive recentemente testando o nosso aplicativo Spark Streaming. O teste de estresse ingere cerca de 20.000 mensagens / s com tamanhos de mensagens variando entre 200 bytes - 1 K em Kafka, onde o Spark Streaming está lendo lotes a cada 4 segundos.

Nosso cluster Spark é executado na versão 1.6.1 com o gerenciador de cluster independente e estamos usando o Scala 2.10.6 para o nosso código.

Após uma execução de 15 a 20 horas, um dos executores que h está iniciando um ponto de verificação (feito em um intervalo de 40 segundos) fica preso no seguinte rastreamento de pilha e nunca é concluído:

java.net.SocketInputStream.socketRead0 (método nativo) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketInputStream.java : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java:532) ) .Sol.security.ssl.SSLSocketImpl.readRecord (SSLSocketImpl.java:973) sun.security.ssl.SSLSocketImpl.performInitialHandshake .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conn.ssl.SSLSocketFactory.connect .java: 401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache .http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.impl.client.DefaultRequestDirector (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractHttpClient.java:863) org.apache .http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3t.service.impl.rest.htor .performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest (RestStorageService.java:277) org.jets3t.service.impl.rest.httpclient.RestStorageService. : 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageSe rvice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service.StorageService.getObjectDetails (StorageService.java:203). StorageService.getObjectDetails (StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Jets3tNativeFileSystemStore.java:174) sun.reflect.GeneratedMethodAccessor32.influite .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io.retry .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (fonte desconhecida) org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus (NativeS3FileSystem.java:10) .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd.ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 168) .ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1.apply (ReliableCheckpointRDD.scala: 136) org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala: 66) org.apache.spark.scheduler.Task.run (Task : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Enquanto está travado, o driver spark se recusa a continuar processando lotes recebidos e cria uma enorme lista de pendências de lotes em fila que não podem ser processados até liberar a tarefa "travada".

Além disso, observando o despejo de threads do driver emstreaming-job-executor-0 mostra claramente que está aguardando a conclusão desta tarefa:

java.lang.Object.wait (método nativo) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark.scheduler .DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.apache.spark.SparkContext .runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd.ReliableRDDCheckpointData.doCheckpoint (58). spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd. RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark.r dd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

Alguém já experimentou esse problema?

questionAnswers(1)

yourAnswerToTheQuestion