Нет, когда логика тоже имеет некоторые недостатки. Я имею в виду способ, которым вы написали это, поскольку все генерирует ноль в столбце статуса. Вы можете попробовать и увидеть это сами. отлаживать и улучшать. Я думаю, вы можете сделать это. :)

ичок в искре / скале. Я пытаюсь прочитать некоторые данные из таблицы кустов в искровой фрейм данных, а затем добавить столбец на основе некоторого условия. Вот мой код:

val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")

def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
      ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
    }

val finalDF = DF.withColumn("status", 
                   when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_,due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
                  .when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
                  .otherwise("null"))

dateDiff это функция, которая вычисляет разницу междуpartition_date а такжеitem_due_date, которые являются столбцами вDF, Я пытаюсь добавить новый столбец вDF используяwhen а такжеotherwise который используетdateDiff чтобы получить разницу между датами.

Теперь, когда я запускаю приведенный выше код, я получаю следующую ошибку:org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0

Я считаю ценность колонкиpartition_date не преобразуется в строку для анализа в качестве даты. Это то, что происходит? Если да, как я могу привести значение столбца в строку?

Ниже приведена схема столбцов, которые я использую изDF :

 |-- item_due_date: string (nullable = true)
 |-- past_due: integer (nullable = true)
 |-- item_decision: string (nullable = true)
 |-- partition_date: string (nullable = true)

Образец данных столбцов, которые я использую изDF :

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|       1|   0001-01-14|         null|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       1|   0001-01-14|     Mitigate|    2017-11-22|
|       0|   2018-03-18|         null|    2017-11-22|
|       1|   2016-11-30|         null|    2017-11-22|
+--------+-------------+-------------+--------------+

Я также попытался использовать пользовательский UDF:

  def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
      if (past_due == 1 && item_due_date != "NULL") {
        if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
          if (item_decision != "NULL") "pending"
          else "approved"
        } else "expired"
      } else "NULL"
    }

val statusUDF = sqlContext.udf.register("statusUDF", status _)

val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()

И это бросает следующую ошибку вDF2.show заявление, каждый раз:

Container exited with a non-zero exit code 50

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1433)
        at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
        at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
        at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
        at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
        at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
        at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
        at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute это функция, которая вычисляет разницу между1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute это функция, которая вычисляет разницу между1.apply(DataFrame.scala:1499)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
        at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$execute$1(DataFrame.scala:1498)
        at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$collect(DataFrame.scala:1505)
        at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1375)
        at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
        at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
        at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
        at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
        at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
        at driver$.main(driver.scala:109)
        at driver.main(driver.scala)

Любая помощь будет оценена. Спасибо!

 Hemanth27 нояб. 2017 г., 05:19
Я знаю, что это неUDF, Это обычная функция scala для получения разницы в датах. @cue Есть идеи, как заставить это работать?
 nabongs27 нояб. 2017 г., 05:17
@Hemanth написанная вами функция неudfпочитайте внимательнее о том, как правильно писатьudf.
 Hemanth27 нояб. 2017 г., 05:00
Я пытался использовать UDF вместоwhen а такжеotherwise, но я столкнулся с ошибками при отображении / сохранении данных. Поэтому я перешел на этот подход. Есть ли способ, которым я могу решить ошибку в этом подходе?
 Ramesh Maharjan27 нояб. 2017 г., 05:05
Вам нужно будет обновить свой вопрос с помощью образца данных и схемы данных. это поможет вам быстро получить ответы
 Ramesh Maharjan27 нояб. 2017 г., 04:57
Ваша функция основана на столбцах. так что вы можете использоватьискровые функции если какие-либо функции удовлетворяют вашим потребностям. В противном случае, если вы хотите манипулировать примитивными типами данных, вам придется использовать функции udf.

Ответы на вопрос(1)

Решение Вопроса

Вы можете просто использоватьdatediff встроенная функция для проверки разницы дней между двумя столбцами. вам не нужно писать свою функцию илиudf функция. И когда функция тоже модифицируется, чем ваша

import org.apache.spark.sql.functions._
val finalDF = DF.withColumn("status",
  when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && col("item_decision").isNotNull && !(lower(col("item_decision")).equalTo("null")), "approved")
    .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").isNull || lower(col("item_decision")).equalTo("null")), "pending")
      .otherwise(when(col("past_due").equalTo(1) && col("item_due_date").isNotNull && !(lower(col("item_due_date")).equalTo("null")) && (datediff(col("partition_date"),col("item_due_date")) >= 0), "expired")
    .otherwise("null"))))

Эта логика преобразуетdataframe

+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
|1       |2017-12-14   |null         |2017-11-22    |
|1       |2017-12-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|1       |0001-01-14   |Mitigate     |2017-11-22    |
|0       |2018-03-18   |null         |2017-11-22    |
|1       |2016-11-30   |null         |2017-11-22    |
+--------+-------------+-------------+--------------+

с добавлениемstatus столбец как

+--------+-------------+-------------+--------------+--------+
|past_due|item_due_date|item_decision|partition_date|status  |
+--------+-------------+-------------+--------------+--------+
|1       |2017-12-14   |null         |2017-11-22    |pending |
|1       |2017-12-14   |Mitigate     |2017-11-22    |approved|
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|1       |0001-01-14   |Mitigate     |2017-11-22    |expired |
|0       |2018-03-18   |null         |2017-11-22    |null    |
|1       |2016-11-30   |null         |2017-11-22    |expired |
+--------+-------------+-------------+--------------+--------+

Я надеюсь, что ответ полезен

 Hemanth27 нояб. 2017 г., 06:55
Ты Спаситель! Благодаря тонну! :)
 Ramesh Maharjan27 нояб. 2017 г., 07:53
ваши даты должны быть такими. пожалуйста, проверьте с датами. :)
 Hemanth27 нояб. 2017 г., 07:24
Программа работает без проблем, но я вижу толькоexpired а такжеnull значения вstatus колонка. Есть ли проблема с логикой?
 Ramesh Maharjan27 нояб. 2017 г., 08:17
Нет, когда логика тоже имеет некоторые недостатки. Я имею в виду способ, которым вы написали это, поскольку все генерирует ноль в столбце статуса. Вы можете попробовать и увидеть это сами. отлаживать и улучшать. Я думаю, вы можете сделать это. :)
 Hemanth27 нояб. 2017 г., 08:12
Кажется, это так. Кроме того, мое «когда» состояние также должно работать нормально, верно? Хотелось бы узнать, есть ли разница.

Ваш ответ на вопрос