Я пытаюсь применить pandas udf к окну структурированного потока pyspark. Проблема в том, что как только поток догоняет текущее состояние, все новые окна как-то содержат только одно значение.
Как вы можетесм. на скриншоте все окна после 2019-10-22T15: 34: 08.730 + 0000 содержат только одно значение. Код, использованный для генерации этого:
@pandas_udf("Count long, Resampled long, Start timestamp, End timestamp", PandasUDFType.GROUPED_MAP)
def myudf(df):
df = df.dropna()
df = df.set_index("Timestamp")
df.sort_index(inplace=True)
# resample the dataframe
resampled = pd.DataFrame()
oidx = df.index
nidx = pd.date_range(oidx.min(), oidx.max(), freq="30S")
resampled["Value"] = df.Value.reindex(oidx.union(nidx)).interpolate('index').reindex(nidx)
return pd.DataFrame([[len(df.index), len(resampled.index), df.index.min(), df.index.max()]], columns=["Count", "Resampled", "Start", "End"])
predictionStream = sensorStream.withWatermark("Timestamp", "90 minutes").groupBy(col("Name"), window(col("Timestamp"), "70 minutes", "5 minutes"))
predictionStream.apply(myudf).writeStream \
.queryName("aggregates") \
.format("memory") \
.start()
Поток получает новые значения каждые 5 минут. Просто окно каким-то образом принимает значения только из последнего пакета, хотя срок годности водяного знака не должен был истечь.
Что-то я делаю не так? Я уже пытался играть с водяным знаком;это никак не повлияло на результат. Мне нужны все значения окна внутри udf.
Я запускаю это в блоках данных на кластере, настроенном на 5.5 LTS ML (включая Apache Spark 2.4.3, Scala 2.11)