Spark scala: SELECT em um loop foreach retorna java.lang.NullPointerException
Eu preciso iterar sobre o conteúdo de um DF com várias instruções SELECT dentro de um loop foreach, escrevendo a saída em arquivos de texto. Qualquer instrução SELECT dentro do loop foreach retorna uma NullPointerException. Eu sou incapaz de ver por que isso é. Uma instrução SELECT dentro de um loop "for" não retorna esse erro.
Este é o caso de teste.
// step 1 of 6: create the table and load two rows
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1 varchar(4)
,username varchar(5)
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""")
// step 2 of 6: confirm that the data is queryable
vc.sql("SELECT * FROM TEST1").show()
+----+--------+-------+
| c1|username|numeric|
+----+--------+-------+
|col1| USER1| 0|
|col1| USER2| 1|
+----+--------+-------+
// Step 3 of 6: create a dataframe for the table
var df=vc.sql("""SELECT * FROM TEST1""")
// step 4 of 6: create a second dataframe that we will use as a loop iterator
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """)
// step 5 of 6: first foreach loop works ok:
df_usernames.foreach(t =>
{
println("(The First foreach works ok: loop iterator t is " + t(0).toString() )
}
)
(The First foreach works ok: loop iterator t is USER1
(The First foreach works ok: loop iterator t is USER2
// step 6 of 6: second foreach with any embedded SQL returns an error
df_usernames.foreach(t =>
{
println("(The second foreach dies: loop iterator t is " + t(0).toString() )
vc.sql("""SELECT c1 FROM TEST1""").show()
}
)
The second foreach dies: loop iterator t is USER1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158 in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195)