Это хорошая практика для обработки данных в StreamingQueryListener onQueryProgress метод структурированной потоковой передачи? - PullRequest
0 голосов
/ 31 октября 2019

Я читаю данные из kafka, выполняю некоторые агрегаты и сохраняю их в формате hdf. Я хочу объединить эти данные с прошлыми данными, которые были записаны в hdfs в том же каталоге. Поскольку прошлые данные будут обновляться, я не смогу объединить их с новыми данными, а также выполнить агрегирование новых данных, поскольку это не поддерживается в структурированной потоковой передаче 2.3.0. В соответствии с документом

As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

Cannot use streaming aggregations before joins.

я уже пытался выполнить потоковое соединение, но это не работает, потому что я хочу также агрегировать новые данные. Поэтому я делаю соединение в методе onQueryProgress StramQueryListener. Я видел только его использование в публикации статистики. Это хорошая практика для обработки данных в слушателях? Если нет, то что может быть альтернативным подходом.

override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
   if(event.progress.numInputRows > 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains(outputPath)) {
          processFlag = 0
   }
   if(event.progress.numInputRows == 0 && event.progress.sink.description.startsWith("FileSink") && event.progress.sink.description.contains(outputPath) && processFlag == 0) {
     "processing here"
     processFlag = 1
   }

Я получаю почасовые данные, поэтому у меня достаточно окна для обработки данных. А поскольку слушатели запускаются в отдельных потоках, это также неблокирующий код.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...