Как использовать EventHubsForeachWriter в Azure Databricks - PullRequest
0 голосов
/ 28 ноября 2018

Я пытаюсь использовать EventHubsForeachWriter, как показано здесь :

val ehConf = EventHubsConf("YOUR_CONNECTION_STRING") 
val writer = EventHubsForeachWriter(ehConf)

val query =
  streamingSelectDF
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()

, но я получаю исключение:

notebook:22: error: type mismatch;
 found   : org.apache.spark.sql.eventhubs.EventHubsForeachWriter
 required: org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]
    .foreach(writer)

1 Ответ

0 голосов
/ 29 января 2019

Нашел ответ в этом github выпуск .

Итак, я думаю, должно работать следующее:

val query =
  streamingSelectDF
    .select(to_json(struct("*")) as 'body)
    .selectExpr("cast(body as string)")
    .as[String]
    .writeStream
    .foreach(writer)
    .outputMode("update")
    .trigger(ProcessingTime("25 seconds"))
    .start()
...