Spark Структурированная потоковая передача нескольких WriteStreams в один и тот же приемник - PullRequest
0 голосов
/ 11 июня 2018

Два Writestream для одного и того же приемника базы данных не происходят последовательно в Spark Structured Streaming 2.2.1.Пожалуйста, предложите, как заставить их выполняться последовательно.

val deleteSink = ds1.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

val UpsertSink = ds2.writestream
  .outputMode("update")
  .foreach(mydbsink)
  .start()

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

Используя приведенный выше код, deleteSink выполняется после UpsertSink.

1 Ответ

0 голосов
/ 19 июня 2018

Если вы хотите, чтобы два потока работали параллельно, вы должны использовать

sparkSession.streams.awaitAnyTermination()

вместо

deleteSink.awaitTermination()
UpsertSink.awaitTermination()

В вашем случае UpsertSink никогда не запустится, если только deleteSink не будет остановленили выданное исключение, как сказано в scaladoc

Ожидает завершения запроса this, либо query.stop(), либо исключением.Если запрос завершился с исключением, то будет сгенерировано исключение.Если запрос завершен, то все последующие вызовы этого метода будут либо немедленно возвращаться (если запрос был завершен с помощью stop()), либо немедленно генерировать исключение (если запрос завершен с исключением).

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...