Почему потоковая агрегация всегда задерживается до двух пакетов данных? - PullRequest
0 голосов
/ 26 января 2019

Я использую Spark 2.3.0.

Моя проблема заключается в том, что всякий раз, когда я добавляю третью порцию данных в мой каталог ввода, первая партия данных обрабатывается и печатается на консоль. Почему?

val spark = SparkSession
  .builder()
  .appName("micro1")
  .enableHiveSupport()
  .config("hive.exec.dynamic.partition", "true")
  .config("hive.exec.dynamic.partition.mode", "nonstrict")
  .config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
  .config("spark.sql.parquet.cacheMetadata","false")
  .getOrCreate()

import spark.implicits._
import org.apache.spark.sql.functions._

// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
  .add("id", IntegerType)
  .add("name", StringType)
  .add("year", IntegerType)
  .add("rating", DoubleType)
  .add("duration", IntegerType)
val xmlData = spark
  .readStream
  .option("sep", ",")
  .schema(mySchema)
  .csv("tostack")

// Right side of a join
val mappingSchema = new StructType()
  .add("id", StringType)
  .add("megavol", StringType)
val staticData = spark
  .read
  .option("sep", ",")
  .schema(mappingSchema)
  .csv("input_tost_static.csv") 

xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")

spark
  .sql("select * from xmlupdates a join mappingdata b on  a.id=b.id")
  .withColumn(
    "event_time",
    to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
  .withWatermark("event_time", "10 seconds")
  .groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
  .agg(
    sum($"rating") as "rating",
    sum($"duration") as "duration",
    sum($"megavol") as "sum_megavol")
  .drop("window")
  .writeStream
  .outputMode("append")
  .format("console")
  .start

мой вывод показывает данные, как показано ниже: я сначала запустил потоковую передачу, а затем добавил данные в определенную папку. когда я добавляю третий файл, первый файл объединяет результаты. Почему?

     -------------------------------------------
     Batch: 0
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 1
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     +----+------+--------+-----------+

     -------------------------------------------
     Batch: 2
     -------------------------------------------
     +----+------+--------+-----------+
     |year|rating|duration|sum_megavol|
     +----+------+--------+-----------+
     |1963|   2.8|    5126|       46.0|
     |1921|   6.0|   15212|     3600.0|
     +----+------+--------+-----------+

Входные данные следующие:

1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733

input_tost_static.csv набор данных выглядит следующим образом:

3,3000
4,600
5,46

Может ли кто-нибудь помочь мне, почему потоковое структурирование в искре показывает такое поведение? Нужно ли добавлять какие-либо настройки здесь? ОБНОВЛЕНИЕ: я получаю результаты в самом пакете 1, если я пытаюсь напечатать значение val перед операцией JOIN ... проблема возникает после присоединения .. его задержка более чем на 3 пакета ....

1 Ответ

0 голосов
/ 28 января 2019

Сначала я запустил потоковую передачу

Пакет: 0 выполняется сразу после того, как вы запустили запрос, и, учитывая, что ни одно событие не было передано в потоковом режиме, выходных данных нет.

На этом этапе водяной знак времени события вообще не устанавливается.

и более поздние данные добавляются в определенную папку.

Это может быть Пакет: 1 .

Водяной знак времени события был установлен на current_timestamp.Чтобы получить какой-либо вывод, нам нужно подождать "10 seconds" (в соответствии с withWatermark("event_time", "10 seconds")).

, когда я добавляю третий файл, первые результаты агрегированного файла печатаются.Почему?

Это может быть Пакет: 2 .

Я предполагаю, что при следующем добавлении новых файлов это было после предыдущего current_timestamp + "10 seconds"и, таким образом, вы получили вывод.

Обратите внимание, что водяной знак может быть просто 0, что означает, что поздних данных не ожидается.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...