java.lang.OutOfMemoryError in pyspark

Hy,

Ich habe einen Datenrahmen in einem Sparkcontext mit 400.000 Zeilen und 3 Spalten. Der Treiber verfügt über 143,5 Speicherplätze

16/03/21 19:52:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55613 with 143.5 GB RAM, BlockManagerId(driver, localhost, 55613)
16/03/21 19:52:35 INFO BlockManagerMaster: Registered BlockManager

Ich möchte, dass der Inhalt dieses DataFrame als Pandas @ zurückgegeben wir

Ich ta

df_users =  UserDistinct.toPandas()

aber ich habe diesen Fehler

16/03/21 20:01:08 ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/21 20:01:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Wie ist das möglich, wenn ich 143,5 GB RAM habe? Was kann ich tun

BEARBEITE

Meine Spark-Defaults

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this fi,le except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
#spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              200g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

my spark context

conf = SparkConf()

conf.set("spark.app.name","teste")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

BEARBEITE

Alle Schritte

#import data for a pandas dataframe

df_ora = pd.read_sql(query, con=connection)

#change for Spark dataframe and some transformation

sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")

#new column
df_with_C = df_oraAS.withColumn("BUY", lit(1))

indexer = StringIndexer(inputCol="ENT_EMAIL", outputCol="user")

#index because I want use recommendation system
user_PK = indexer.fit(df_with_C).transform(df_with_C)

#distinct
UserDistinct = user_PK.dropDuplicates(['ENT_EMAIL' ,'user'])

#data in Pandas dataframe
df_users =  UserDistinct.toPandas()

New Edit

change for Driver 60g und Executor 60g

Error

16/03/22 09:53:40 INFO MemoryStore: Block taskresult_446 stored as bytes in memory (estimated size 1978.5 MB, free 22.5 GB)
16/03/22 09:53:40 INFO BlockManagerInfo: Added taskresult_446 in memory on localhost:56281 (size: 1978.5 MB, free: 20.4 GB)
16/03/22 09:53:40 INFO Executor: Finished task 14.0 in stage 6.0 (TID 446). 2074557399 bytes result sent via BlockManager)
16/03/22 09:53:40 INFO TaskSetManager: Starting task 25.0 in stage 6.0 (TID 457, localhost, partition 25,NODE_LOCAL, 1999 bytes)
16/03/22 09:53:40 INFO Executor: Running task 25.0 in stage 6.0 (TID 457)
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:04 ERROR Executor: Exception in task 18.0 in stage 6.0 (TID 450)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:04 INFO TaskSetManager: Starting task 26.0 in stage 6.0 (TID 458, localhost, partition 26,NODE_LOCAL, 1999 bytes)
16/03/22 09:54:04 INFO Executor: Running task 26.0 in stage 6.0 (TID 458)
16/03/22 09:54:04 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:05 INFO SparkContext: Invoking stop() from shutdown hook
16/03/22 09:54:06 WARN QueuedThreadPool: 6 threads could not be stopped
16/03/22 09:54:06 INFO SparkUI: Stopped Spark web UI at http://10.10.5.105:4040
16/03/22 09:54:08 INFO DAGScheduler: ResultStage 6 (toPandas at <stdin>:1) failed in 385.120 s
16/03/22 09:54:08 INFO DAGScheduler: Job 3 failed: toPandas at <stdin>:1, took 398.921433 s
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-1
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "task-result-getter-1" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2.run(TaskResultGetter.scala:50)
    ... 3 more
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 1378, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 280, in collect
    port = self._jdf.collectToPython()
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

Antworten auf die Frage(8)

Ihre Antwort auf die Frage