Spark (Scala) Структурированная потоковая агрегация и самостоятельное объединение - PullRequest
0 голосов
/ 12 сентября 2018

Я пытаюсь выполнить агрегацию с последующим самостоятельным объединением в структурированном потоке DataFrame. Предположим, что df выглядит следующим образом:

sourceDf.show(false)
+-----+-------+
|owner|fruits |
+-----+-------+
|Brian|apple  |
|Brian|pear   |
|Brian|melon  |
|Brian|avocado|
|Bob  |avocado|
|Bob  |apple  |
+-----+-------+

На статическом DataFrame это просто:

val aggDf = sourceDf.groupBy($"owner").agg(collect_list(col("fruits")) as "fruitsA")
sourceDf.join(aggDf, Seq("owner")).show(false)
+-----+-------+-----------------------------+
|owner|fruits |fruitsA                      |
+-----+-------+-----------------------------+
|Brian|apple  |[apple, pear, melon, avocado]|
|Brian|pear   |[apple, pear, melon, avocado]|
|Brian|melon  |[apple, pear, melon, avocado]|
|Brian|avocado|[apple, pear, melon, avocado]|
|Bob  |avocado|[avocado, apple]             |
|Bob  |apple  |[avocado, apple]             |
+-----+-------+-----------------------------+

К сожалению, я не могу понять, как это сделать в случае потоковой передачи DataFrame. Итак, я попытался использовать следующий полный код, который использует Kafka для Source и Sink:

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StringType, StructType}


object Test {

  val spark: SparkSession = SparkSession.builder().getOrCreate()
  import spark.implicits._

  val brokers = "kafka:9092"

  val inputTopic = "test.kafka.sink.input"
  val aggTopic = "test.kafka.sink.agg"
  val outputTopicSelf = "test.kafka.sink.output.self"
  val outputTopicTwo = "test.kafka.sink.output.two"

  val payloadSchema: StructType = new StructType()
    .add("owner", StringType)
    .add("fruits", StringType)

  val payloadSchemaA: StructType = new StructType()
    .add("owner", StringType)
    .add("fruitsA", StringType)

  var joinedDfSchema: StructType = _

  val sourceDf: DataFrame = Seq(
    ("Brian", "apple"),
    ("Brian", "pear"),
    ("Brian", "melon"),
    ("Brian", "avocado"),
    ("Bob", "avocado"),
    ("Bob", "apple")
  )
    .toDF("owner", "fruits")

  val additionalData: DataFrame = Seq(("Bob", "grapes")).toDF("owner", "fruits")

  def saveDfToKafka(df: DataFrame): Unit = {
    df
      .select(to_json(struct(df.columns.map(column): _*)).alias("value"))
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("topic", inputTopic)
      .save()
  }

  // save data to kafka (batch)
  saveDfToKafka(sourceDf)

  // kafka source
  val farmDF: DataFrame = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", brokers)
    .option("startingOffsets", "earliest")
    .option("subscribe", inputTopic)
    .load()
    .byteArrayToString("value")
    .withColumn("value", from_json($"value", payloadSchema))
    .expand("value")

  farmDF.printSchema()

  implicit class DFHelper(df: DataFrame) {
    def expand(column: String): DataFrame = {
      val wantedColumns = df.columns.filter(_ != column) :+ s"$column.*"
      df.select(wantedColumns.map(col): _*)
    }

    def byteArrayToString(column: String): DataFrame = {
      val selectedCols = df.columns.filter(_ != column) :+ s"CAST($column AS STRING)"
      df.selectExpr(selectedCols: _*)
    }
  }

  def testSelfAggJoinFail(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .groupBy($"owner")
      .agg(collect_list(col("fruits")) as "fruitsA")

    // joined df
    val joinedDF = farmDF
      .join(myFarmDF.as("myFarmDF"), Seq("owner"))
      .select("owner", "fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testSelfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        window($"timestamp", "30 seconds", "15 seconds"),
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA", "window")

    // joined df
    val joinedDF = farmDF
        .as("farmDF")
      .withWatermark("timestamp", "30 seconds")
      .join(
        myFarmDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.window.start AND
            |farmDF.timestamp <= myFarmDF.window.end
          """.stripMargin))
      .select("farmDF.owner", "farmDF.fruits", "myFarmDF.fruitsA")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column): _*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointSelf")
      .option("topic", outputTopicSelf)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def testTwoDfAggJoin(): Unit = {
    // aggregated df
    val myFarmDF = farmDF
      .withWatermark("timestamp", "30 seconds")
      .groupBy(
        $"owner"
      )
      .agg(collect_list(col("fruits")) as "fruitsA")
      .select("owner", "fruitsA")

    // save the aggregated df to kafka
    myFarmDF
      .select(to_json(struct(myFarmDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("update")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointAgg")
      .option("topic", aggTopic)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)

    // read the aggregated df from kafka as a stream
    val aggDF = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("startingOffsets", "earliest")
      .option("subscribe", aggTopic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", payloadSchemaA))
      .expand("value")
      .withWatermark("timestamp", "30 seconds")

    // joined df
    val joinedDF = farmDF
      .as("farmDF")
      .join(
        aggDF.as("myFarmDF"),
        expr(
          """
            |farmDF.owner = myFarmDF.owner AND
            |farmDF.timestamp >= myFarmDF.timestamp - interval 1 hour AND
            |farmDF.timestamp <= myFarmDF.timestamp + interval 1 hour
          """.stripMargin))
      .select("farmDF.owner", "myFarmDF.fruitsA", "farmDF.fruits")

    joinedDfSchema = joinedDF.schema

    // stream sink
    joinedDF
      .select(to_json(struct(joinedDF.columns.map(column):_*)).alias("value"))
      .writeStream
      .outputMode("append")
      .option("kafka.bootstrap.servers", brokers)
      .option("checkpointLocation", "/data/kafka/checkpointTwo")
      .option("topic", outputTopicTwo)
      .format("kafka")
      .start()

    // let's give time to process the stream
    Thread.sleep(10000)
  }

  def data(topic: String): DataFrame = {
    // let's read back the output topic using kafka batch
    spark
      .read
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topic)
      .load()
      .byteArrayToString("value")
      .withColumn("value", from_json($"value", joinedDfSchema))
      .expand("value")
  }
}

Теперь, если я тестирую потоковую передачу DataFrame:

scala> Test.testSelfAggJoinFail
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Project [structstojson(named_struct(owner, owner#59, fruits, fruits#60, fruitsA, fruitsA#78), Some(Etc/UTC)) AS value#96]
+- Project [owner#59, fruits#60, fruitsA#78]
   +- Project [owner#59, key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, fruits#60, fruitsA#78]
      +- Join Inner, (owner#59 = owner#82)
         :- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#59, value#51.fruits AS fruits#60]
         :  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
         :     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
         :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]
         +- SubqueryAlias myFarmDF
            +- Aggregate [owner#82], [owner#82, collect_list(fruits#83, 0, 0) AS fruitsA#78]
               +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, value#51.owner AS owner#82, value#51.fruits AS fruits#83]
                  +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, jsontostructs(StructField(owner,StringType,true), StructField(fruits,StringType,true), value#43, Some(Etc/UTC), true) AS value#51]
                     +- Project [key#29, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35, cast(value#30 as string) AS value#43]
                        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@3269e790, kafka, Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092), [key#29, value#30, topic#31, partition#32, offset#33L, timestamp#34, timestampType#35], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42eeb996,kafka,List(),None,List(),None,Map(startingOffsets -> earliest, subscribe -> test.kafka.sink.input, kafka.bootstrap.servers -> kafka:9092),None), kafka, [key#22, value#23, topic#24, partition#25, offset#26L, timestamp#27, timestampType#28]

  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:374)
  at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:110)
  at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:235)
  at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
  at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
  at Test$.testSelfAggJoinFail(<console>:123)
  ... 51 elided

не получается с Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark, потому что я не использую водяные знаки.

Теперь, если я смогу запустить второй тест с

Test.testSelfAggJoin

Я получаю эти предупреждения

2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (window#70-T30000ms.start - timestamp#139-T30000ms) due to window#70-T30000ms.start
2018-09-12 16:07:33 WARN  StreamingJoinHelper:66 - Failed to extract state value watermark from condition (timestamp#139-T30000ms - window#70-T30000ms.end) due to window#70-T30000ms.end

И я могу проверить результат с помощью

Test.data(Test.outputTopicSelf).show(false)
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 2 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
2018-09-12 16:08:01 WARN  NetworkClient:882 - [Consumer clientId=consumer-5, groupId=spark-kafka-relation-02f5512f-cc3c-40ad-938f-e3dfdca95f8c-driver-0] Error while fetching metadata with correlation id 6 : {test.kafka.sink
.output.self=LEADER_NOT_AVAILABLE}
+---+-----+---------+------+---------+-------------+-----+------+-------+
|key|topic|partition|offset|timestamp|timestampType|owner|fruits|fruitsA|
+---+-----+---------+------+---------+-------------+-----+------+-------+
+---+-----+---------+------+---------+-------------+-----+------+-------+

, который возвращает пустой DataFrame (возможно, из-за предупреждения?). Я не смог найти решение с самостоятельным присоединением.

Наконец, я попытался передать агрегат в Kafka и перечитать его как второй поток DataFrame, как в

scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+

, который работает (хотя и не очень эффективно, я бы сказал), но если я добавлю дополнительные данные в тему источника:

scala> Test.saveDfToKafka(Test.additionalData)
scala> Test.data(Test.outputTopicTwo).show(false)
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|key |topic                     |partition|offset|timestamp              |timestampType|owner|fruitsA                           |fruits |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+
|null|test.kafka.sink.output.two|0        |0     |2018-09-12 16:57:04.376|0            |Brian|["avocado","apple","pear","melon"]|avocado|
|null|test.kafka.sink.output.two|0        |1     |2018-09-12 16:57:04.376|0            |Bob  |["apple","avocado"]               |apple  |
|null|test.kafka.sink.output.two|0        |2     |2018-09-12 16:57:04.38 |0            |Brian|["avocado","apple","pear","melon"]|apple  |
|null|test.kafka.sink.output.two|0        |3     |2018-09-12 16:57:04.38 |0            |Bob  |["apple","avocado"]               |avocado|
|null|test.kafka.sink.output.two|0        |4     |2018-09-12 16:57:04.381|0            |Brian|["avocado","apple","pear","melon"]|pear   |
|null|test.kafka.sink.output.two|0        |5     |2018-09-12 16:57:04.382|0            |Brian|["avocado","apple","pear","melon"]|melon  |
|null|test.kafka.sink.output.two|0        |6     |2018-09-12 16:59:37.125|0            |Bob  |["apple","avocado"]               |grapes |
|null|test.kafka.sink.output.two|0        |7     |2018-09-12 16:59:40.001|0            |Bob  |["apple","avocado","grapes"]      |apple  |
|null|test.kafka.sink.output.two|0        |8     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |avocado|
|null|test.kafka.sink.output.two|0        |9     |2018-09-12 16:59:40.002|0            |Bob  |["apple","avocado","grapes"]      |grapes |
+----+--------------------------+---------+------+-----------------------+-------------+-----+----------------------------------+-------+

Я получаю намного больше строк, вероятно потому, что мне пришлось использовать .outputMode("update") при потоплении агрегации Df.

  • Есть ли способ выполнить это объединение, не отправляя объединение обратно в Kafka в качестве отдельной темы?
  • Если нет, можно ли изменить testTwoDfAggJoin на использование .outputMode("append")?

Ответы [ 2 ]

0 голосов
/ 27 июня 2019

Начиная с версии Spark 2.3, объединение двух потоковых DF невозможно, если перед объединением задействованы некоторые агрегатные функции.

Из документации по искрам

Additional details on supported joins:

    Joins can be cascaded, that is, you can do df1.join(df2, ...).join(df3, ...).join(df4, ....).

    As of Spark 2.3, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.

    As of Spark 2.3, you cannot use other non-map-like operations before joins. Here are a few examples of what cannot be used.

        Cannot use streaming aggregations before joins.

        Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
0 голосов
/ 13 ноября 2018

Я столкнулся с похожей информацией об ошибке, outputMode важен для agg, я решил добавить df.writeStream.outputMode(OutputMode.Update()) или df.writeStream.outputMode(OutputMode.Complete())

Ref:

Режимы выводаСуществует несколько типов режимов вывода.

Режим добавления (по умолчанию) - это режим по умолчанию, при котором в приемную таблицу выводятся только новые строки, добавленные после последнего триггера.Это поддерживается только для тех запросов, в которых строки, добавленные в таблицу результатов, никогда не изменятся.Следовательно, этот режим гарантирует, что каждая строка будет выводиться только один раз (при условии отказоустойчивого приемника).Например, запросы только с выбором, где, карта, flatMap, фильтр, объединение и т. Д. Будут поддерживать режим добавления.

Полный режим - вся таблица результатов будет выводиться в приемник после каждого триггера.Это поддерживается для запросов агрегации.

Режим обновления - (Доступно с Spark 2.1.1) Только те строки в таблице результатов, которые были обновлены с момента последнего триггера, будут выводиться в приемник.Дополнительная информация будет добавлена ​​в будущих выпусках.

http://blog.madhukaraphatak.com/introduction-to-spark-structured-streaming-part-3/

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...