Spark Stateful Streaming Job зависает на контрольной точке на S3 после длительного времени безотказной работы

Недавно я проводил стресс-тестирование нашего приложения Spark Streaming. Стресс-тестирование потребляет около 20 000 сообщений в секунду с размерами сообщений, варьирующимися от 200 байт до 1 Кб, в Kafka, где Spark Streaming считывает пакеты каждые 4 секунды.

Наш кластер Spark работает под управлением версии 1.6.1 с автономным диспетчером кластеров, и мы используем Scala 2.10.6 для нашего кода.

Примерно через 15-20 часов работы один из исполнителей, чей h запускает контрольную точку (выполняется с интервалом в 40 секунд), привязывается к следующей трассировке стека и никогда не завершается:

java.net.SocketInputStream.socketRead0 (собственный метод) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketIn) : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java:532) ) sun.security.ssl. SSLSocketImpl.readRecord (SSLSocketImpl.java:973) .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conLSocket.FoLS.coLSFF .java: 401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache .http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.dll (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractH) .http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3l.tlient.serviceShtvice .performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient. : 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageSe rvice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service. StorageService.getObjectDetails (StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Jets3tNativeFileSystemStore.java:174) sun.reflect. .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io. .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (Неизвестный источник) org.apache.hadoop.fs.s3native.NativeS3FileSystem.get2.SaSF ( .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd. .ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1 : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolrecutor $ (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Будучи застрявшим, драйвер искры отказывается продолжать обработку входящих пакетов и создает огромное отставание в очередях, которые не могут быть обработаны, пока не будет отпущена задача, которая «застряла».

Более того, глядя на дамп потока драйверов подstreaming-job-executor-0 наглядно показывает, что ждет выполнения этой задачи:

java.lang.Object.wait (собственный метод) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark.scheduler .DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.ap .runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd. spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd. RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark.r dd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

Кто-нибудь сталкивался с такой проблемой?

Ответы на вопрос(1)

Ваш ответ на вопрос