Как использовать udf для вычисления данных из таблицы - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь извлечь данные из таблицы на основе объединения с данными, поступающими из потока.

Запрос выполняется правильно в отдельной записной книжке, но в этом udf он вызывает | java.lang.NullPointerException

UDF определяется как

    val usersOrgMap = users.join(orgs, Seq("orgid"), "left")
 def getOrgId: (String => String) = { s => 
        try {
        ((usersOrgMap.where($"userid" === s)).first().mkString(",").trim).toString
        } catch {
            case e: Exception =>  e.toString()
        }
    }

val getOrgIdUDF = udf(getOrgId)

и использует его как

val messages =
      incomingStream
.withColumn("OrgId",getOrgIdUDF(get_json_object(($"body").cast("string"), "$.UserGuidTrimmed")))
  .select("OrgId")
messages.writeStream.outputMode("append").format("console").option("truncate", false).start().awaitTermination()

Есть ли лучший способ получить данные из таблицы, используя данные json в потоке?

...