Как передать массив [Seq [String]] в apache spark udf? (Ошибка: не применимо)
У меня есть следующий apache spark udf в scala:
val myFunc = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float], context: Seq[String]) =>
var result = Float.NaN
if (userFactors != null) {
var contexBias = 0f
for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}
// definition of result
// ...
}
result
}
Теперь я хочу передать параметры этой функции, однако всегда получаю сообщение Not Applicable из-за параметраcontext
, Я знаю, что пользовательские функции принимают входные данные по строкам, и эта функция запускается, если я удаляюcontext
... как решить эту проблему? Почему он не читает строки изArray[Seq[String]]
то есть изcontext
? В качестве альтернативы, было бы приемлемо пройтиcontext
какDataFrame
или что-то подобное.
// context is Array[Seq[String]]
val a = sc.parallelize(Seq((1,2),(3,4))).toDF("a", "b")
val context = a.collect.map(_.toSeq.map(_.toString))
// userBias("bias"), otherBias("biases") and userFactors("features")
// have a type Column, while userBias... are DataFrames
myDataframe.select(dataset("*"),
myFunc(userBias("bias"),
otherBias("biases"),
userFactors("features"),
context)
.as($(newCol)))
ОБНОВИТЬ:
Я пробовал решение, указанное в ответеzero323
Однако все еще есть небольшая проблема сcontext: Array[Seq[String]]
, В частности, проблема с циклом над этим массивомfor (cc <- context) { contexBias += otherBiases(contextMapping(cc)) }
, Я должен передать строкуcontextMapping
неSeq[String]
:
def myFunc(context: Array[Seq[String]]) = udf {
(userBias: Float, otherBiases: Map[Long, Float],
userFactors: Seq[Float]) =>
var result = Float.NaN
if (userFactors != null) {
var contexBias = 0f
for (cc <- context) {
contexBias += otherBiases(contextMapping(cc))
}
// estimation of result
}
result
}
Теперь я называю это следующим образом:
myDataframe.select(dataset("*"),
myFunc(context)(userBias("bias"),
otherBias("biases"),
userFactors("features"))
.as($(newCol)))