Запросы с потоковыми источниками должны выполняться с помощью writeStream.start () - PullRequest
0 голосов
/ 11 июня 2018

У меня есть структурированный поток данных tempDataFrame2, состоящий из Field1.Я пытаюсь вычислить приблизительное количество Field1.Однако всякий раз, когда я набираю

val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0), я получаю следующее сообщение об ошибке:

Queries with streaming sources must be executed with writeStream.start()

Ниже приведен фрагмент кода:

val tempDataFrame2 = A structured streaming dataframe

// Calculate IQR
val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)

// Filter messages
val tempDataFrame3 = tempDataFrame2.filter("Some working filter")

val query = tempDataFrame2.writeStream.outputMode("append").queryName("table").format("console").start()
query.awaitTermination()

Я уже прошел через эти две ссылки с SO: Link1 Link2 .К сожалению, я не могу связать эти ответы с моей проблемой.

Редактировать

После прочтения комментариев я планирую придерживаться следующего способа:

1) Прочитать все незафиксированные смещения из темы Кафки.2) Сохраните их в переменную dataframe.3) Остановите структурированный поток, чтобы я больше не читал тему Кафки.4) Начните обработку сохраненного кадра данных с шага 2).

Но, теперь я не уверен, что делать дальше -

1) например, как узнать, что у меня нет другогозаписи для использования в теме Kafka и остановки потокового запроса?

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...