Я получаю данные в одной дельта-таблице и хочу, чтобы два потребителя обрабатывали ее:
У меня в разных банках эти коды:
1) Процесс Spark для вычисления агрегации в реальном времени.
val df_aggregations = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_aggregation_checkpoint)
.load(configuration.delta_table)
2) Процесс Spark для получения новых значений в режиме реального времени.
val df_news = spark.readStream
.format("delta")
.option("ignoreDeletes", "true")
.option("ignoreChanges", "true")
.option("checkpointLocation", configuration.delta_news_checkpoint)
.load(configuration.delta_table)
Моя проблема в том, что я просто вижу, что один из процессов работает нормально, я имею в виду, если я запускаю процесс 1) сначала, чем 2) Я вижу хорошие результаты процесса 1), однако, я не вижу результатов процесс 2), и если я сначала запускаю процесс 2), то я вижу результаты процесса 2), но я не вижу результатов процесса 1).