Я использую Spark 2.3.0.
Моя проблема заключается в том, что всякий раз, когда я добавляю третью порцию данных в мой каталог ввода, первая партия данных обрабатывается и печатается на консоль. Почему?
val spark = SparkSession
.builder()
.appName("micro1")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.streaming.checkpointLocation", "/user/sas/sparkCheckpoint")
.config("spark.sql.parquet.cacheMetadata","false")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
// Left side of a join
import org.apache.spark.sql.types._
val mySchema = new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("year", IntegerType)
.add("rating", DoubleType)
.add("duration", IntegerType)
val xmlData = spark
.readStream
.option("sep", ",")
.schema(mySchema)
.csv("tostack")
// Right side of a join
val mappingSchema = new StructType()
.add("id", StringType)
.add("megavol", StringType)
val staticData = spark
.read
.option("sep", ",")
.schema(mappingSchema)
.csv("input_tost_static.csv")
xmlData.createOrReplaceTempView("xmlupdates")
staticData.createOrReplaceTempView("mappingdata")
spark
.sql("select * from xmlupdates a join mappingdata b on a.id=b.id")
.withColumn(
"event_time",
to_utc_timestamp(current_timestamp, Calendar.getInstance().getTimeZone().getID()))
.withWatermark("event_time", "10 seconds")
.groupBy(window($"event_time", "10 seconds", "10 seconds"), $"year")
.agg(
sum($"rating") as "rating",
sum($"duration") as "duration",
sum($"megavol") as "sum_megavol")
.drop("window")
.writeStream
.outputMode("append")
.format("console")
.start
мой вывод показывает данные, как показано ниже: я сначала запустил потоковую передачу, а затем добавил данные в определенную папку. когда я добавляю третий файл, первый файл объединяет результаты. Почему?
-------------------------------------------
Batch: 0
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
+----+------+--------+-----------+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+------+--------+-----------+
|year|rating|duration|sum_megavol|
+----+------+--------+-----------+
|1963| 2.8| 5126| 46.0|
|1921| 6.0| 15212| 3600.0|
+----+------+--------+-----------+
Входные данные следующие:
1,The Nightmare Before Christmas,1993,3.9,4568
2,The Mummy,1993,3.5,4388
3,Orphans of the Storm,1921,3.2,9062
4,The Object of Beauty,1921,2.8,6150
5,Night Tide,1963,2.8,5126
6,One Magic Christmas,1963,3.8,5333
7,Muriel's Wedding,1963,3.5,6323
8,Mother's Boys,1963,3.4,5733
input_tost_static.csv
набор данных выглядит следующим образом:
3,3000
4,600
5,46
Может ли кто-нибудь помочь мне, почему потоковое структурирование в искре показывает такое поведение? Нужно ли добавлять какие-либо настройки здесь?
ОБНОВЛЕНИЕ: я получаю результаты в самом пакете 1, если я пытаюсь напечатать значение val перед операцией JOIN ... проблема возникает после присоединения .. его задержка более чем на 3 пакета ....