Кумулятивный подсчет с использованием искровой структурированной потоковой передачи - PullRequest
0 голосов
/ 15 октября 2019

Я хочу рассчитать совокупное количество значений в столбце фрейма данных за последние 1 час, используя движущееся окно. Я могу получить ожидаемый результат с помощью оконной функции pyspark (без потоковой передачи), используя rangeBetween, но я хочу использовать обработку данных в реальном времени, поэтому стараюсь использовать потоковую структурированную искру, чтобы, если в систему поступала какая-либо новая запись / транзакция, я получал желаемый результат.

данные похожи на

time,col
2019-04-27 01:00:00,A
2019-04-27 00:01:00,A
2019-04-27 00:05:00,B
2019-04-27 01:01:00,A
2019-04-27 00:08:00,B
2019-04-27 00:03:00,A
2019-04-27 03:03:00,A

с использованием pyspark (без потоковой передачи)

from pyspark.sql.window import Window
df = sqlContext.read.format("csv") \
   .options(header='true', inferschema='false',delimiter=',') \
    .load(r'/datalocation')
df=df.withColumn("numddate",unix_timestamp(df.time, "yyyy-MM-dd HH:mm:ss"))
w1=Window.partitionBy("col").orderBy("numddate").rangeBetween(-3600, -1)
df=df.withColumn("B_cumulative_count", count("col").over(w1))

+-------------------+---+----------+------------------+
|               time|col|  numddate|B_cumulative_count|
+-------------------+---+----------+------------------+
|2019-04-27 00:05:00|  B|1556348700|                 0|
|2019-04-27 00:08:00|  B|1556348880|                 1|
|2019-04-27 00:01:00|  A|1556348460|                 0|
|2019-04-27 00:03:00|  A|1556348580|                 1|
|2019-04-27 01:00:00|  A|1556352000|                 2|
|2019-04-27 01:01:00|  A|1556352060|                 3|
|2019-04-27 03:03:00|  A|1556359380|                 0|
+-------------------+---+----------+------------------+

(This is what I required, so getting it by above code)

Структурированная потоковая передача, вот что я пытаюсь

userSchema = StructType([
    StructField("time", TimestampType()),
    StructField("col", StringType())
])


lines2 = spark \
    .readStream \
.format('csv')\
.schema(userSchema)\
 .csv("/datalocation")

windowedCounts = lines2.groupBy(
    window(lines2.time, "1 hour"),
    lines2.col
).count()

windowedCounts.writeStream.format("memory").outputMode("complete").queryName("test2").option("truncate","false").start()

spark.table("test2").show(truncate=False)

streaming output:
+------------------------------------------+---+-----+
|window                                    |col|count|
+------------------------------------------+---+-----+
|[2019-04-27 03:00:00, 2019-04-27 04:00:00]|A  |1    |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|A  |2    |
|[2019-04-27 01:00:00, 2019-04-27 02:00:00]|A  |2    |
|[2019-04-27 00:00:00, 2019-04-27 01:00:00]|B  |2    |
+------------------------------------------+---+-----+

Как тиражировать то же самое с использованием искровой структурированной потоковой передачи?

1 Ответ

0 голосов
/ 15 октября 2019

Вы можете сгруппировать по слайду окна и сосчитать его.

Пример подсчета слов в структурированном потоке -

 val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", true)
  .load()

// Split the lines into words, retaining timestamps
val words = lines.as[(String, Timestamp)].flatMap(line =>
  line._1.split(" ").map(word => (word, line._2))
).toDF("word", "timestamp")

val windowDuration = "10 seconds"
val slideDuration = "5 seconds"

// Group the data by window and word and compute the count of each group
val windowedCounts = words.groupBy(
  window($"timestamp", windowDuration, slideDuration), $"word"
).count().orderBy("window")

// Start running the query that prints the windowed word counts to the console
    val query = windowedCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", "false")
      .start()

query.awaitTermination()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...