Я пытаюсь написать код, который сначала выполняет объединение, а затем выполняет агрегацию (группирование и подсчет).
Я хочу, чтобы вывод моего этапа агрегирования был обновляемым. Ниже код, который я использую:
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "testerIn")
.load().selectExpr("CAST(value AS STRING)").as[String]
val interimDF = df.join(df,"value")
val newDF = interimDF.groupBy("value").count().toJSON
newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
.option("topic", "tester").start()
spark.streams.awaitAnyTermination()
Этот код выдает ошибку, потому что режим обновления не поддерживается объединениями потокового потока в spark:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;
Теперь я полностью понимаю, почему свеча бросает эту ошибку, потому что, когда мы присоединяемся; Режим обновления вряд ли имеет какой-либо смысл (потому что мы будем получать новую строку в выходных данных всякий раз, когда на входе появляется новая строка, следовательно, добавляем).
Если бы я должен был вывести кадр данных после моего соединения (interimDF) в Kafka в режиме добавления, а затем прочитать его, выполнить этап агрегации (newDF) и записать его обратно в какой-то другой поток в режиме обновления, моя проблема была бы решена. Это именно то, что я хочу сделать, но я хочу избежать написания на сцене Кафки в середине. Есть ли способ, которым это возможно? Я также готов принять хакерские решения или ссылку на запрос на удаление, который кто-то мог сделать в отношении подобных вещей.