Хорошо. Но у меня есть 22Bill строки в таблице с различными значениями в этом столбце раздела, и я не могу дать предикаты для стольких значений.

аюсь переместить данные из таблицы в таблице PostgreSQL в таблицу Hive в HDFS. Для этого я придумал следующий код:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")
                                                                      .option("user", devUserName).option("password", devPassword)
                                                                      .option("partitionColumn","cast_id")
                                                                      .option("lowerBound", 1).option("upperBound", 100000)
                                                                      .option("numPartitions",70).load()
        val totalCols:List[String] = splitColumns ++ textList
        val cdt                    = new ChangeDataTypes(totalCols, dataMapper)
        hiveDataTypes              = cdt.gpDetails()
        val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)
        val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns
        val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))
        val resultDF               = yearDF.select(allCols:_*)
        val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)
        val finalDF                = stringColumns.foldLeft(resultDF) {
          (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))
        }
        finalDF
  }
    val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)
    val dataDFPart = dataDF.repartition(30)
    dataDFPart.createOrReplaceTempView("preparedDF")
    spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
    spark.sql("set hive.exec.dynamic.partition=true")
    spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF")

Данные вставляются в таблицу кустов, динамически разбитых на основеprtn_String_columns: source_system_name, period_year, period_num

Spark-submit использовал:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal [email protected] --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar

Следующие сообщения об ошибках генерируются в журналах исполнителя:

Container exited with a non-zero exit code 143.
Killed by external signal
18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system]
java.lang.OutOfMemoryError: Java heap space
    at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)
    at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)
    at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)
    at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)
    at java.util.jar.JarFile.getManifest(JarFile.java:180)
    at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)
    at sun.misc.Signal$1.run(Signal.java:212)
    at java.lang.Thread.run(Thread.java:745)

Я вижу в журналах, что чтение выполняется правильно с заданным количеством разделов, как показано ниже:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50]

Ниже приведено состояние исполнителей по этапам:

Данные не разделены должным образом. Один раздел меньше, а другой становится огромным. Здесь есть проблема перекоса. При вставке данных в таблицу Hive задание завершается ошибкой:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") но я понимаю, что это происходит из-за проблемы перекоса данных.

Я пытался увеличить количество исполнителей, увеличивая память исполнителя, память драйвера, пытался просто сохранить как файл csv вместо сохранения кадра данных в таблицу Hive, но ничто не влияет на выполнение, выдавая исключение:

java.lang.OutOfMemoryError: GC overhead limit exceeded

Есть ли в коде что-то, что мне нужно исправить? Может ли кто-нибудь дать мне знать, как я могу решить эту проблему?

Ответы на вопрос(3)

Ваш ответ на вопрос