Окно группировки Spark Structured Streaming - я хочу, чтобы первый интервал начинался с первой отметки времени - PullRequest
1 голос
/ 09 апреля 2019

Из простого полного примера использования агрегирования окон в Spark 2.31 (HDP 3.0) я вижу, что Spark создает интервалы, которые выровнены по некоторому целому числу.Например, здесь я указываю 60 секунд windowDuration, и Spark запускает первый интервал с ближайшей минуты:

>>> from pyspark.sql import functions as F
>>> df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")
>>> w = df.groupBy(F.window("date", "60 seconds")).agg(F.sum("val").alias("sum"))
>>> w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()
[Row(start='2016-03-11 09:00:00', end='2016-03-11 09:01:00', sum=2)]

Есть ли способ начать интервал с первого сообщения, т.е. в моем случае я бы хотелиметь:

[Row(start='2016-03-11 09:00:07', end='2016-03-11 09:01:07', sum=2)]

1 Ответ

1 голос
/ 10 апреля 2019

Вот, пожалуйста =>

from pyspark.sql import functions as F
from datetime import datetime

df = spark.createDataFrame([("2016-03-11 09:00:07", 1),("2016-03-11 09:00:08", 1)]).toDF("date", "val")

startSecond = datetime.strptime(df.head()[0], '%Y-%m-%d %H:%M:%S').second

w = df.groupBy(F.window("date", "60 seconds", "60 seconds", str(startSecond) + " seconds")).agg(F.sum("val").alias("sum"))

w.select(w.window.start.cast("string").alias("start"),w.window.end.cast("string").alias("end"), "sum").collect()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...