Многократные вызовы pyspark window () показывают ошибку при выполнении groupBy () - PullRequest
0 голосов
/ 21 января 2019

Этот вопрос является продолжением этого ответа .Spark отображает ошибку, когда возникает следующая ситуация:

# Group results in 12 second windows of "foo", then by integer buckets of 2 for "bar"
fooWindow = window(col("foo"), "12 seconds"))

# A sub bucket that contains values in [0,2), [2,4), [4,6]...
barWindow = window(col("bar").cast("timestamp"), "2 seconds").cast("struct<start:bigint,end:bigint>")

results = df.groupBy(fooWindow, barWindow).count()

Ошибка:

"Несколько выражений временного окна приведут к декартовому произведению строк, поэтому онив настоящее время не поддерживается. "

Есть ли способ достичь желаемого поведения?

1 Ответ

0 голосов
/ 24 января 2019

Мне удалось найти решение, используя адаптацию этого ответа SO .

Примечание. Это решение работает только при наличии не более одного вызова window,это означает, что несколько временных окон не допускаются.Выполнение быстрого поиска на spark github показывает, что существует жесткое ограничение для <= 1 окон.

Используя withColumn для определения сегментов для каждой строки, мы можем затем сгруппировать поновый столбец напрямую:

from pyspark.sql import functions as F
from datetime import datetime as dt, timedelta as td

start = dt.now()
second = td(seconds=1)
data = [(start, 0), (start+second, 1), (start+ (12*second), 2)]
df = spark.createDataFrame(data, ('foo', 'bar'))

# Create a new column defining the window for each bar
df = df.withColumn("barWindow", F.col("bar") - (F.col("bar") % 2))

# Keep the time window as is
fooWindow = F.window(F.col("foo"), "12 seconds").start.alias("foo")

# Use the new column created
results = df.groupBy(fooWindow, F.col("barWindow")).count().show()

# +-------------------+---------+-----+
# |                foo|barWindow|count|
# +-------------------+---------+-----+
# |2019-01-24 14:12:48|        0|    2|
# |2019-01-24 14:13:00|        2|    1|
# +-------------------+---------+-----+
...