У меня есть поток DataFrame в Databricks, и я хочу выполнить действие для каждого элемента. В сети я нашел методы специального назначения, такие как запись в консоль или сброс в память, но я хочу добавить некоторую бизнес-логику и поместить некоторые результаты в Redis.
Если быть более точным, то это будет выглядеть в не потоковом случае:
val someDataFrame = Seq(
("key1", "value1"),
("key2", "value2"),
("key3", "value3"),
("key4", "value4")
).toDF()
def someFunction(keyValuePair: (String, String)) = {
println(keyValuePair)
}
someDataFrame.collect.foreach(r => someFunction((r(0).toString, r(1).toString)))
Но если someDataFrame
- это не простой фрейм данных, а фрейм потоковых данных (действительно исходящий от Kafka), сообщение об ошибке будет таким:
org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Может ли кто-нибудь помочь мне решить эту проблему?
Некоторые важные замечания:
Я прочитал соответствующую документацию, такую как Spark Streaming или Databricks Streaming, а также несколько других описаний.
Я знаю, что должно быть что-то вроде start()
и awaitTermination
, но я не знаю точного синтаксиса. Описания не помогли.
Потребуются страницы, чтобы перечислить все возможности, которые я пробовал, поэтому я не буду их предоставлять.
Я не хочу решить конкретную проблему отображения результата. То есть пожалуйста, не предоставляйте решение этого конкретного случая. someFunction
будет выглядеть так:
val someData = readSomeExternalData()
if (condition containing keyValuePair and someData) {
doSomething(keyValuePair);
}
(Вопрос Какова цель ForeachWriter в Spark Structured Streaming? не дает рабочего примера, поэтому не отвечает на мой вопрос.)