У меня есть структурированный поток данных tempDataFrame2
, состоящий из Field1
.Я пытаюсь вычислить приблизительное количество Field1
.Однако всякий раз, когда я набираю
val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)
, я получаю следующее сообщение об ошибке:
Queries with streaming sources must be executed with writeStream.start()
Ниже приведен фрагмент кода:
val tempDataFrame2 = A structured streaming dataframe
// Calculate IQR
val Array(Q1, Q3) = tempDataFrame2.stat.approxQuantile("Field1", Array(0.25, 0.75), 0.0)
// Filter messages
val tempDataFrame3 = tempDataFrame2.filter("Some working filter")
val query = tempDataFrame2.writeStream.outputMode("append").queryName("table").format("console").start()
query.awaitTermination()
Я уже прошел через эти две ссылки с SO: Link1 Link2 .К сожалению, я не могу связать эти ответы с моей проблемой.
Редактировать
После прочтения комментариев я планирую придерживаться следующего способа:
1) Прочитать все незафиксированные смещения из темы Кафки.2) Сохраните их в переменную dataframe.3) Остановите структурированный поток, чтобы я больше не читал тему Кафки.4) Начните обработку сохраненного кадра данных с шага 2).
Но, теперь я не уверен, что делать дальше -
1) например, как узнать, что у меня нет другогозаписи для использования в теме Kafka и остановки потокового запроса?