У меня есть DataFrame, который содержит столбец массива. Мне нужно выполнить некоторые преобразования (соединения и т. Д.) Для каждого из элементов массива. Я не хочу писать UDF для этого, поскольку он не настолько эффективен по сравнению с простым Spark SQL. Поэтому я решил explode
столбец массива, чтобы получить строку для каждого из элементов, выполнить все преобразования, используя Spark SQL, и, наконец, собрать преобразованные значения, вызвав groupBy(...)
с последующей агрегацией collect_list
. Это отлично работает в пакетном режиме.
В потоковом режиме я должен определить отметку времени с водяным знаком и выполнить агрегирование на основе этой отметки времени. Моя идея состояла в том, чтобы добавить столбец отметки времени (назовем его ingestion_time
) прямо перед тем, как взорвать столбец массива, и установить водяной знак X секунд для этого столбца.
Я сделал следующие наблюдения, которые мне труднообъяснение:
Запись результата в режиме вывода "append" не приведет к записи полного результата в приемник. Спарк просто пишет ок. 20% результата и сохраняет оставшиеся 80% в памяти в течение неопределенного периода времени. Такое поведение воспроизводимо, то есть это всегда те же 20% данных, которые записываются в приемник.
Когда я переключаюсь на «обновление» в режиме вывода, все в порядке, и результат точно такой же, как в пакетном режиме.
Водяной знак не влияет на результат. Я мог бы установить его на сколь угодно малое значение, и результат остался бы точно таким же. Ничто не теряется по сравнению с результатом партии. Это нормально, но я ожидал, что получившиеся массивы будут неполными при небольшом водяном знаке.
У кого-нибудь есть объяснение этому поведению?
Некоторая справочная информация:
Я запускаю задания Spark в Azure Databricks, версия Spark 2.4.2.
Я пробовал водяные знаки от 1 миллисекунды до 1 секунды.
Я установил ingest_time один раз с current_timestamp
из pyspark.sql.functions
и с UDF, который оценивает datetime.utcnow()
(см. Пример ниже) => результаты такие же
df.printSchema()
# root
# |-- id: long (nullable = true)
# |-- value_array: array<float> (nullable = true)
def get_current_timestamp():
return datetime.utcnow()
current_timestamp_udf = udf(get_current_timestamp, TimestampType())
# set timestamp and explode array column
exploded = df \
.withColumn("ingestion_time", current_timestamp_udf()) \
.withColumn("value", explode(col("value_array"))) \
.select("id", "value", "ingestion_time")
# perform some complex transformations
# transformed = exploded...
aggregated = transformed \
.withWatermark("ingestion_time", delayThreshold="1 seconds") \
.groupBy("ingestion_time", "id") \
.agg("value_array", collect_list("transformed_value"))