SparkR Job 100 Minuten Timeout
Ich habe ein etwas komplexes sparkR-Skript geschrieben und es mit spark-submit ausgeführt. Grundsätzlich wird eine große, auf Hive / Impala-Parkett basierende Tabelle zeilenweise gelesen und eine neue Parkettdatei mit der gleichen Anzahl von Zeilen erstellt. Aber es sieht so aus, als würde der Job nach genau 100 Minuten beendet, was eine gewisse Zeitüberschreitung bedeutet.
Für bis zu 500K Zeilen funktioniert das Skript einwandfrei (da es weniger als 100 Minuten benötigt)Für 1, 2 oder 3 oder mehr Millionen Zeilen wird das Skript nach 100 Minuten beendet.Ich habe alle möglichen Parameter mit Werten von 100 Minuten überprüft, die ich kenne und getestet habe. Konnte aber keine Lösung finden.
[user@localhost R]$ time spark-submit sparkr-pre.R
Loading required package: methods
Attaching package: ‘SparkR’
The following objects are masked from ‘package:stats’:
filter, na.omit
The following objects are masked from ‘package:base’:
intersect, rbind, sample, subset, summary, table, transform
15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================> (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
at org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
at org.apache.spark.scheduler.DAGScheduler$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
at org.apache.spark.SparkContext$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
at org.apache.spark.SparkContext$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
at org.apache.spark.util.SparkShutdownHookManager$anon$2.run(ShutdownHookManager.scala:216)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$5.apply(SparkPlan.scala:140)
at org.apache.spark.sql.execution.SparkPlan$anonfun$execute$5.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed
real 100m30.944s
user 1m26.326s
sys 0m19.459s
Umgebungslaufzeitinformationen
Name Value
Java Home /usr/java/jdk1.8.0_40/jre
Java Version 1.8.0_40 (Oracle Corporation)
Scala Version version 2.10.4
Spark Properties
Name Value
spark.akka.frameSize 1024
spark.app.id application_1451466100034_0019
spark.app.name SparkR
spark.driver.appUIAddress http://x.x.x.x:4040
spark.driver.host x.x.x.x
spark.driver.maxResultSize 8G
spark.driver.memory 100G
spark.driver.port 60471
spark.executor.id driver
spark.executor.memory 14G
spark.executorEnv.LD_LIBRARY_PATH $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri http://x.x.x.x:50281
spark.home /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode FIFO
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString true
spark.submit.deployMode client
spark.ui.filters org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files file:/home/inuser/R/sparkr-pre.R
System Properties
Name Value
SPARK_SUBMIT true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding UTF-8
file.encoding.pkg sun.io
file.separator /
java.awt.graphicsenv sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version 52.0
java.endorsed.dirs /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home /usr/java/jdk1.8.0_40/jre
java.io.tmpdir /tmp
java.library.path :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name Java(TM) SE Runtime Environment
java.runtime.version 1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor Oracle Corporation
java.specification.version 1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version 1.8.0_40
java.vm.info mixed mode
java.vm.name Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name Java Virtual Machine Specification
java.vm.specification.vendor Oracle Corporation
java.vm.specification.version 1.8
java.vm.vendor Oracle Corporation
java.vm.version 25.40-b25
line.separator
os.arch amd64
os.name Linux
os.version 2.6.32-431.el6.x86_64
path.separator :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian little
sun.cpu.isalist
sun.io.unicode.encoding UnicodeLittle
sun.java.command org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher SUN_STANDARD
sun.jnu.encoding UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel
sun.os.patch.level unknown
user.country US
user.dir /home/user/R
user.home /home/user
user.language en
user.name inuser
user.timezone Asia/Colombo
Classpath Entries
Resource Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/ System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar System Classpath
spark-default.conf
# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.
# Example:
# spark.master spark://master:7077
# spark.eventLog.enabled true
# spark.eventLog.dir hdfs://namenode:8021/directory
# spark.serializer org.apache.spark.serializer.KryoSerializer
# spark.driver.memory 5g
# spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master yarn-client
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.driver.memory 100G
spark.executor.memory 14G
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize 8G
spark.akka.frameSize 1024
#spark.executor.instances 16
Ich kann das sparkR-Skript nicht öffentlich freigeben. Tut mir wirklich leid. Der Code funktioniert jedoch einwandfrei, wenn er weniger als 100 Minuten benötigt.