Я читаю данные из kafka, выполняю некоторые агрегаты и сохраняю их в формате hdf. Я хочу объединить эти данные с прошлыми данными, которые были записаны в hdfs в том же каталоге. Поскольку прошлые данные будут обновляться, я не смогу объединить их с новыми данными, а также выполнить агрегирование новых данных, поскольку это не поддерживается в структурированной потоковой передаче 2.3.0. В соответствии с документом
As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.
Cannot use streaming aggregations before joins.
я уже пытался выполнить потоковое соединение, но это не работает, потому что я хочу также агрегировать новые данные. Поэтому я делаю соединение в методе onQueryProgress StramQueryListener. Я видел только его использование в публикации статистики. Это хорошая практика для обработки данных в слушателях? Если нет, то что может быть альтернативным подходом.
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
if(event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains(outputPath)) {
processFlag = 0
}
if(event.progress.numInputRows == 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains(outputPath) && processFlag == 0) {
"processing here"
processFlag = 1
}
Я получаю почасовые данные, поэтому у меня достаточно окна для обработки данных. А поскольку слушатели запускаются в отдельных потоках, это также неблокирующий код.