Apache Spark: обновить режим вывода после операции объединения потоковых наборов данных - PullRequest
1 голос
/ 27 марта 2019

Я пытаюсь написать код, который сначала выполняет объединение, а затем выполняет агрегацию (группирование и подсчет).

Я хочу, чтобы вывод моего этапа агрегирования был обновляемым. Ниже код, который я использую:

    val spark = SparkSession.builder().master("local").getOrCreate()

    import spark.implicits._


    val df = spark.readStream.format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "testerIn")
      .load().selectExpr("CAST(value AS STRING)").as[String]


    val interimDF = df.join(df,"value")

    val newDF = interimDF.groupBy("value").count().toJSON

    newDF.writeStream.format("kafka").outputMode("update") .option("kafka.bootstrap.servers", "localhost:9092") . option("checkpointLocation","/path/to/directory")
      .option("topic", "tester").start()

    spark.streams.awaitAnyTermination()

Этот код выдает ошибку, потому что режим обновления не поддерживается объединениями потокового потока в spark:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported in Update output mode, only in Append output mode;;

Теперь я полностью понимаю, почему свеча бросает эту ошибку, потому что, когда мы присоединяемся; Режим обновления вряд ли имеет какой-либо смысл (потому что мы будем получать новую строку в выходных данных всякий раз, когда на входе появляется новая строка, следовательно, добавляем).

Если бы я должен был вывести кадр данных после моего соединения (interimDF) в Kafka в режиме добавления, а затем прочитать его, выполнить этап агрегации (newDF) и записать его обратно в какой-то другой поток в режиме обновления, моя проблема была бы решена. Это именно то, что я хочу сделать, но я хочу избежать написания на сцене Кафки в середине. Есть ли способ, которым это возможно? Я также готов принять хакерские решения или ссылку на запрос на удаление, который кто-то мог сделать в отношении подобных вещей.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...