Spark Stateful Streaming Job зависает на контрольной точке на S3 после длительного времени безотказной работы

Недавно я проводил стресс-тестирование нашего приложения Spark Streaming. Стресс-тестирование потребляет около 20 000 сообщений в секунду с размерами сообщений, варьирующимися от 200 байт до 1 Кб, в Kafka, где Spark Streaming считывает пакеты каждые 4 секунды.

Наш кластер Spark работает под управлением версии 1.6.1 с автономным диспетчером кластеров, и мы используем Scala 2.10.6 для нашего кода.

Примерно через 15-20 часов работы один из исполнителей, чей h запускает контрольную точку (выполняется с интервалом в 40 секунд), привязывается к следующей трассировке стека и никогда не завершается:

java.net.SocketInputStream.socketRead0 (собственный метод) java.net.SocketInputStream.socketRead (SocketInputStream.java:116) java.net.SocketInputStream.read (SocketInputStream.java:170) java.net.SocketInputStream.read (SocketIn) : 141) sun.security.ssl.InputRecord.readFully (InputRecord.java:465) sun.security.ssl.InputRecord.readV3Record (InputRecord.java:593) sun.security.ssl.InputRecord.read (InputRecord.java:532) ) sun.security.ssl. SSLSocketImpl.readRecord (SSLSocketImpl.java:973) .security.ssl.SSLSocketImpl.startHandshake (SSLSocketImpl.java:1387) org.apache.http.conn.ssl.SSLSocketFactory.connectSocket (SSLSocketFactory.java:533) org.apache.http.conLSocket.FoLS.coLSFF .java: 401) org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection (DefaultClientConnectionOperator.java:177) org.apache .http.impl.conn.AbstractPoolEntry.open (AbstractPoolEntry.java:144) org.apache.http.impl.conn.AbstractPooledConnAdapter.open (AbstractPooledConnAdapter.java:131) org.apache.http.dll (DefaultRequestDirector.java:610) org.apache.http.impl.client.DefaultRequestDirector.execute (DefaultRequestDirector.java:445) org.apache.http.impl.client.AbstractHttpClient.doExecute (AbstractH) .http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:82) org.apache.http.impl.client.CloseableHttpClient.execute (CloseableHttpClient.java:57) org.jets3l.tlient.serviceShtvice .performRequest (RestStorageService.java:326) org.jets3t.service.impl.rest.httpclient. : 1038) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl (RestStorageSe rvice.java:2250) org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl (RestStorageService.java:2179) org.jets3t.service. StorageService.getObjectDetails (StorageService.java:575) org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata (Jets3tNativeFileSystemStore.java:174) sun.reflect. .java: 43) java.lang.reflect.Method.invoke (Method.java:497) org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod (RetryInvocationHandler.java:187) org.apache.hadoop.io. .RetryInvocationHandler.invoke (RetryInvocationHandler.java:102) org.apache.hadoop.fs.s3native. $ Proxy18.retrieveMetadata (Неизвестный источник) org.apache.hadoop.fs.s3native.NativeS3FileSystem.get2.SaSF ( .apache.hadoop.fs.FileSystem.exists (FileSystem.java:1424) org.apache.spark.rdd.ReliableCheckpointRDD $ .writePartitionToCheckpointFile (ReliableCheckpointRDD.scala: 168) org.apache.spark.rdd. .ReliableCheckpointRDD $ anonfun $ writeRDDToCheckpointDirectory $ 1 : 89) org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala: 214) java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolrecutor $ (ThreadPoolExecutor.java:617) java.lang.Thread.run (Thread.java:745)

Будучи застрявшим, драйвер искры отказывается продолжать обработку входящих пакетов и создает огромное отставание в очередях, которые не могут быть обработаны, пока не будет отпущена задача, которая «застряла».

Более того, глядя на дамп потока драйверов подstreaming-job-executor-0 наглядно показывает, что ждет выполнения этой задачи:

java.lang.Object.wait (собственный метод) java.lang.Object.wait (Object.java:502) org.apache.spark.scheduler.JobWaiter.awaitResult (JobWaiter.scala: 73) org.apache.spark.scheduler .DAGScheduler.runJob (DAGScheduler.scala: 612) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1832) org.apache.spark.SparkContext.runJob (SparkContext.scala: 1845) org.ap .runJob (SparkContext.scala: 1922) org.apache.spark.rdd.ReliableCheckpointRDD $ .writeRDDToCheckpointDirectory (ReliableCheckpointRDD.scala: 135) org.apache.spark.rdd. spark.rdd.RDDCheckpointData.checkpoint (RDDCheckpointData.scala: 74) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply $ mcV $ sp (RDD.scala: 1682) org.apache.spark.rdd. RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1.apply (RDD.scala: 1679) org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala: 150) org.apache.spark.r dd.RDD.doCheckpoint (RDD.scala: 1678) org.apache.spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) org.apache. spark.rdd.RDD $ anonfun $ doCheckpoint $ 1 $ anonfun $ apply $ mcV $ sp $ 1.apply (RDD.scala: 1684) scala.collection.immutable.List.foreach (List.scala: 318)

Кто-нибудь сталкивался с такой проблемой?

 Yuval Itzchakov27 июл. 2016 г., 10:25
@eliasah Нет, это не EMR. Это кластер Spark, который я вручную настроил для работы с автономным диспетчером кластеров.
 Yuval Itzchakov27 июл. 2016 г., 11:21
@ zero323 Я не вижу сходства. Проблема была в том, что он видел тайм-ауты и исключения, в данном случае это полная зависание всей работы.
 zero32327 июл. 2016 г., 11:19
Это чем-то похоже наstackoverflow.com/q/34879092/1560062не так ли?
 eliasah27 июл. 2016 г., 10:21
Используете ли вы AWS EMR или настраиваемый управляемый кластер на EC2? Похоже, что EMR не подходит для длительной стрессовой работы, так как состояние кластера Hadoop со временем ухудшается.media.amazonwebservices.com/AWS_Amazon_EMR_Best_Practices.pdf страница 27-28

Ответы на вопрос(1)

Решение Вопроса

Зависание сокета происходит из-за ошибки вHttpClient библиотека используетсяorg.jets3t где рукопожатие SSL не использует указанное время ожидания. Вы можете найти детали проблемыВот.

Эта ошибка воспроизводится в версиях HttpClient ниже v4.5.1, где она была исправлена. К сожалению, Spark 1.6.x использует v4.3.2, который не имеет поставленного исправления.

Есть три возможных обходных пути, о которых я подумал:

Используйте механизм спекуляции Спарка черезspark.speculation настройки конфигурации. Это помогает в крайних случаях зависания, поскольку оно воспроизводится редко и под нагрузкой. Обратите внимание, что это может вызвать некоторые ложные срабатывания в начале потоковой работы, когда у spark нет хорошего представления о том, как долго выполняется ваше медианное задание, но это определенно не то, что вызывает заметную задержку.

В документации сказано:

Если установлено значение «true», выполняет спекулятивное выполнение задач. Это означает, что если одна или несколько задач на этапе выполняются медленно, они будут перезапущены.

Вы включаете его, предоставляя флаги для spark-submit:

spark-submit  \
--conf "spark.speculation=true" \
--conf "spark.speculation.multiplier=5" \

Для получения дополнительной информации о различных настройках вы можете пройти, см.Конфигурация искры страница

Передача вручную HttpClient v4.5.1 или выше в classpath Sparks, чтобы он мог загрузить этот JAR до того, как он был в его Uber JAR. Это может быть немного сложно, так как процесс загрузки классов с помощью Spark немного громоздок. Это означает, что вы можете сделать что-то вроде:

CP=''; for f in /path/to/httpcomponents-client-4.5.2/lib/*.jar; do CP=$CP$f:; done
SPARK_CLASSPATH="$CP" sbin/start-master.sh   # on your master machine
SPARK_CLASSPATH="$CP" sbin/start-slave.sh 'spark://master_name:7077' 

Или просто обновите конкретную версию JAR доSPARK_CLASSPATH вspark-env.sh.

Обновление доSpark 2.0.0, В новой версии Spark используется HttpClient v4.5.2, который решает эту проблему.

Ваш ответ на вопрос