Как рассчитать разницу лагов в Spark структурированной потоковой передаче? - PullRequest
0 голосов
/ 23 ноября 2018

Я пишу программу Spark Structured Streaming.Мне нужно создать дополнительный столбец с разницей в запаздывании.

Чтобы воспроизвести мою проблему, я предоставляю фрагмент кода.Этот код использует data.json файл, хранящийся в папке data:

[
  {"id": 77,"type": "person","timestamp": 1532609003},
  {"id": 77,"type": "person","timestamp": 1532609005},
  {"id": 78,"type": "crane","timestamp": 1532609005}
]

Код:

from pyspark.sql import SparkSession
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("Test") \
    .master("local[2]") \
    .getOrCreate()

schema = StructType([
    StructField("id", IntegerType()),
    StructField("type", StringType()),
    StructField("timestamp", LongType())
])

ds = spark \
    .readStream \
    .format("json") \
    .schema(schema) \
    .load("data/")

diff_window = Window.partitionBy("id").orderBy("timestamp")
ds = ds.withColumn("prev_timestamp", func.lag(ds.timestamp).over(diff_window))

query = ds \
    .writeStream \
    .format('console') \
    .start()

query.awaitTermination()

Я получаю эту ошибку:

pyspark.sql.utils.AnalysisException: временные окна не поддерживаются для потоковых фреймов данных / фреймов данных;Строки между 1 и 1) как prev_timestamp # 129L]

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...