Структурированная потоковая передача PySpark применяет udf к окну - PullRequest
0 голосов
/ 22 октября 2019

Я пытаюсь применить pandas udf к окну структурированного потока pyspark. Проблема в том, что как только поток догоняет текущее состояние, все новые окна как-то содержат только одно значение.

py

Как вы можетесм. на скриншоте все окна после 2019-10-22T15: 34: 08.730 + 0000 содержат только одно значение. Код, использованный для генерации этого:

@pandas_udf("Count long, Resampled long, Start timestamp, End timestamp", PandasUDFType.GROUPED_MAP)
def myudf(df):
  df = df.dropna()
  df = df.set_index("Timestamp")
  df.sort_index(inplace=True)

  # resample the dataframe
  resampled = pd.DataFrame()
  oidx = df.index
  nidx = pd.date_range(oidx.min(), oidx.max(), freq="30S")
  resampled["Value"] = df.Value.reindex(oidx.union(nidx)).interpolate('index').reindex(nidx)
  return pd.DataFrame([[len(df.index), len(resampled.index), df.index.min(), df.index.max()]], columns=["Count", "Resampled", "Start", "End"])

predictionStream = sensorStream.withWatermark("Timestamp", "90 minutes").groupBy(col("Name"), window(col("Timestamp"), "70 minutes", "5 minutes"))

predictionStream.apply(myudf).writeStream \
    .queryName("aggregates") \
    .format("memory") \
    .start()

Поток получает новые значения каждые 5 минут. Просто окно каким-то образом принимает значения только из последнего пакета, хотя срок годности водяного знака не должен был истечь.

Что-то я делаю не так? Я уже пытался играть с водяным знаком;это никак не повлияло на результат. Мне нужны все значения окна внутри udf.

Я запускаю это в блоках данных на кластере, настроенном на 5.5 LTS ML (включая Apache Spark 2.4.3, Scala 2.11)

1 Ответ

0 голосов
/ 22 октября 2019

Похоже, что вы можете указать нужный вам режим вывода. WriteStream

См. Документацию по режимам вывода

По умолчанию используется режим добавления:

Это режим по умолчанию, при котором только новые строки, добавленные в таблицу результатов с момента последнего триггера, будут выводиться в приемник.

Попробуйте использовать:

predictionStream.apply(myudf).writeStream \
.queryName("aggregates") \
.format("memory") \
.outputMode(OutputMode.Complete) \
.start()
...