Нет, когда логика тоже имеет некоторые недостатки. Я имею в виду способ, которым вы написали это, поскольку все генерирует ноль в столбце статуса. Вы можете попробовать и увидеть это сами. отлаживать и улучшать. Я думаю, вы можете сделать это. :)
ичок в искре / скале. Я пытаюсь прочитать некоторые данные из таблицы кустов в искровой фрейм данных, а затем добавить столбец на основе некоторого условия. Вот мой код:
val DF = hiveContext.sql("select * from (select * from test_table where partition_date='2017-11-22') a JOIN (select max(id) as bid from test_table where partition_date='2017-11-22' group by at_id) b ON a.id=b.bid")
def dateDiff(partition_date: org.apache.spark.sql.Column, item_due_date: org.apache.spark.sql.Column): Long ={
ChronoUnit.DAYS.between(LocalDate.parse(partition_date.toString()), LocalDate.parse(item_due_date.toString))
}
val finalDF = DF.withColumn("status",
when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && !(col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "approved")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_,due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) < 0) && (col("item_decision").equalTo(null) || col("item_decision").equalTo("NULL") || col("item_decision").equalTo("null")), "pending")
.when(col("past_due").equalTo(1) && !(col("item_due_date").equalTo(null) || col("item_due_date").equalTo("NULL") || col("item_due_date").equalTo("null")) && (dateDiff(col("partition_date"),col("item_due_date")) >= 0), "expired")
.otherwise("null"))
dateDiff
это функция, которая вычисляет разницу междуpartition_date
а такжеitem_due_date
, которые являются столбцами вDF
, Я пытаюсь добавить новый столбец вDF
используяwhen
а такжеotherwise
который используетdateDiff
чтобы получить разницу между датами.
Теперь, когда я запускаю приведенный выше код, я получаю следующую ошибку:org.threeten.bp.format.DateTimeParseException: Text 'partition_date' could not be parsed at index 0
Я считаю ценность колонкиpartition_date
не преобразуется в строку для анализа в качестве даты. Это то, что происходит? Если да, как я могу привести значение столбца в строку?
Ниже приведена схема столбцов, которые я использую изDF
:
|-- item_due_date: string (nullable = true)
|-- past_due: integer (nullable = true)
|-- item_decision: string (nullable = true)
|-- partition_date: string (nullable = true)
Образец данных столбцов, которые я использую изDF
:
+--------+-------------+-------------+--------------+
|past_due|item_due_date|item_decision|partition_date|
+--------+-------------+-------------+--------------+
| 1| 0001-01-14| null| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 1| 0001-01-14| Mitigate| 2017-11-22|
| 0| 2018-03-18| null| 2017-11-22|
| 1| 2016-11-30| null| 2017-11-22|
+--------+-------------+-------------+--------------+
Я также попытался использовать пользовательский UDF:
def status(past_due: Int, item_decision: String, maxPartitionDate: String, item_due_date: String): String = {
if (past_due == 1 && item_due_date != "NULL") {
if (ChronoUnit.DAYS.between(LocalDate.parse(maxPartitionDate.trim), LocalDate.parse(item_due_date.trim)) < 0) {
if (item_decision != "NULL") "pending"
else "approved"
} else "expired"
} else "NULL"
}
val statusUDF = sqlContext.udf.register("statusUDF", status _)
val DF2 = DF.withColumn("status", statusUDF(DF("past_due"),DF("item_decision"),DF("partition_date"),DF("item_due_date")))
DF2.show()
И это бросает следующую ошибку вDF2.show
заявление, каждый раз:
Container exited with a non-zero exit code 50
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1433)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1421)
at org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1420)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1420)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1644)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1603)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1592)
at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1844)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute это функция, которая вычисляет разницу между1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.DataFrame$anonfun$org$apache$spark$sql$DataFrame$execute это функция, которая вычисляет разницу между1.apply(DataFrame.scala:1499)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$execute$1(DataFrame.scala:1498)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$collect(DataFrame.scala:1505)
at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1375)
at org.apache.spark.sql.DataFrame$anonfun$head$1.apply(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099)
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374)
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456)
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
at driver$.main(driver.scala:109)
at driver.main(driver.scala)
Любая помощь будет оценена. Спасибо!