Я пытаюсь извлечь данные из таблицы на основе объединения с данными, поступающими из потока.
Запрос выполняется правильно в отдельной записной книжке, но в этом udf он вызывает | java.lang.NullPointerException
UDF определяется как
val usersOrgMap = users.join(orgs, Seq("orgid"), "left")
def getOrgId: (String => String) = { s =>
try {
((usersOrgMap.where($"userid" === s)).first().mkString(",").trim).toString
} catch {
case e: Exception => e.toString()
}
}
val getOrgIdUDF = udf(getOrgId)
и использует его как
val messages =
incomingStream
.withColumn("OrgId",getOrgIdUDF(get_json_object(($"body").cast("string"), "$.UserGuidTrimmed")))
.select("OrgId")
messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()
Есть ли лучший способ получить данные из таблицы, используя данные json в потоке?