Неожиданное поведение Spark при вызове collect_list после взрыва - PullRequest
0 голосов
/ 05 ноября 2019

У меня есть DataFrame, который содержит столбец массива. Мне нужно выполнить некоторые преобразования (соединения и т. Д.) Для каждого из элементов массива. Я не хочу писать UDF для этого, поскольку он не настолько эффективен по сравнению с простым Spark SQL. Поэтому я решил explode столбец массива, чтобы получить строку для каждого из элементов, выполнить все преобразования, используя Spark SQL, и, наконец, собрать преобразованные значения, вызвав groupBy(...) с последующей агрегацией collect_list. Это отлично работает в пакетном режиме.

В потоковом режиме я должен определить отметку времени с водяным знаком и выполнить агрегирование на основе этой отметки времени. Моя идея состояла в том, чтобы добавить столбец отметки времени (назовем его ingestion_time) прямо перед тем, как взорвать столбец массива, и установить водяной знак X секунд для этого столбца.

Я сделал следующие наблюдения, которые мне труднообъяснение:

  1. Запись результата в режиме вывода "append" не приведет к записи полного результата в приемник. Спарк просто пишет ок. 20% результата и сохраняет оставшиеся 80% в памяти в течение неопределенного периода времени. Такое поведение воспроизводимо, то есть это всегда те же 20% данных, которые записываются в приемник.

  2. Когда я переключаюсь на «обновление» в режиме вывода, все в порядке, и результат точно такой же, как в пакетном режиме.

  3. Водяной знак не влияет на результат. Я мог бы установить его на сколь угодно малое значение, и результат остался бы точно таким же. Ничто не теряется по сравнению с результатом партии. Это нормально, но я ожидал, что получившиеся массивы будут неполными при небольшом водяном знаке.

У кого-нибудь есть объяснение этому поведению?

Некоторая справочная информация:

  • Я запускаю задания Spark в Azure Databricks, версия Spark 2.4.2.

  • Я пробовал водяные знаки от 1 миллисекунды до 1 секунды.

  • Я установил ingest_time один раз с current_timestamp из pyspark.sql.functions и с UDF, который оценивает datetime.utcnow() (см. Пример ниже) => результаты такие же


df.printSchema()
# root
# |-- id: long (nullable = true)
# |-- value_array: array<float> (nullable = true)

def get_current_timestamp():
  return datetime.utcnow()

current_timestamp_udf = udf(get_current_timestamp, TimestampType())

# set timestamp and explode array column
exploded = df \
  .withColumn("ingestion_time", current_timestamp_udf()) \
  .withColumn("value", explode(col("value_array"))) \ 
  .select("id", "value", "ingestion_time")

# perform some complex transformations
# transformed = exploded...

aggregated = transformed \
  .withWatermark("ingestion_time", delayThreshold="1 seconds") \
  .groupBy("ingestion_time", "id") \
  .agg("value_array", collect_list("transformed_value"))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...