Как объединить два набора потоковых данных, когда один набор данных включает агрегирование - PullRequest
0 голосов
/ 19 февраля 2020

Я получаю ошибку ниже в следующем фрагменте кода -

Исключение в потоке "главная" организация. apache .spark. sql .AnalysisException: Добавить режим вывода не поддерживается, когда есть потоковые агрегаты при потоковой передаче данных DataFrames / DataSets без водяного знака ;;

Ниже приведена моя схема ввода

val schema = new StructType()
.add("product",StringType)
.add("org",StringType)
.add("quantity", IntegerType)
.add("booked_at",TimestampType)

Создание набора исходных потоковых данных

val payload_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")

Создание другой потоковой передачи фрейм данных, в котором выполняется агрегация, а затем объединить его с исходным фреймом данных, чтобы отфильтровать записи

payload_df.createOrReplaceTempView("orders")
    val stage_df = spark.sql("select org, product, max(booked_at) as booked_at from orders group by 1,2")
  stage_df.createOrReplaceTempView("stage")

  val total_qty = spark.sql(
    "select o.* from orders o join stage s on o.org = s.org and o.product = s.product and o.booked_at > s.booked_at ")

Наконец, я пытался отобразить результаты на консоли с режимом добавления выходных данных. Я не могу понять, где мне нужно добавить водяной знак или как решить эту проблему. Моя цель - отфильтровать только те события в каждом триггере, которые имеют более высокую временную метку, чем максимальная временная метка, полученная в любом из более ранних триггеров

total_qty
    .writeStream
    .format("console")
    .outputMode("append")
    .start()
    .awaitTermination()

1 Ответ

0 голосов
/ 19 февраля 2020

При потоковой структуризации с искрой вы можете производить агрегацию непосредственно в потоке только с водяным знаком. Если у вас есть столбец с отметкой времени события, вы можете сделать это следующим образом:

val payload_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test1")
.option("startingOffsets", "latest")
.load()
.selectExpr("CAST(value AS STRING) as payload")
.select(from_json(col("payload"), schema).as("data"))
.select("data.*")
.withWatermark("event_time", "1 minutes")

Для запросов с агрегацией у вас есть 3 типа выходных данных:

  • Режим добавления использует водяной знак для удаления старого состояния агрегации. Но вывод оконной агрегации задерживается на более поздний порог, указанный в withWatermark(), так как в соответствии с семантикой режимов строки могут быть добавлены в таблицу результатов только один раз после их финализации (т. Е. После пересечения водяного знака). Подробнее см. В разделе «Поздние данные».

  • Режим обновления использует водяной знак для удаления старого состояния агрегации.

  • Режим завершения не удаляет старое состояние агрегации, поскольку по определению этот режим сохраняет все данные в таблице результатов.

Редактировать позже: необходимо добавить окно в методе groupBy , val aggFg = payload_df.groupBy ( окно ($ "event_time", "1 минута") , $ "org", $ "product") .agg (max (booked_at) .as ("booked_at") )

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...