потоковый агрегат не записывается в приемник - PullRequest
1 голос
/ 27 сентября 2019

Я должен обработать некоторые файлы, которые приходят ко мне ежедневно.Информация имеет первичный ключ (date, client_id, operation_id).Поэтому я создал поток, который добавляет только новые данные в дельта-таблицу:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

Это работает нормально, но мне нужно обобщить эту информацию, сгруппированную по (date, client_id), поэтому я создал другую потоковую передачу изэта таблица операций в новую таблицу.Поэтому я попытался преобразовать свое поле date во временную метку, чтобы я мог использовать режим добавления при записи агрегированного потока:

import pyspark.sql.functions as F

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')
summarized= summarized.withColumn('timestamp_date',F.to_timestamp('date'))
summarized= summarized.withWatermark('timestamp_date','1 second').groupBy('client_id','date','timestamp_date').agg(<lot of aggs>)

summarized\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .trigger(once=True)\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

Этот код выполняется, но ничего не записывает в приемник.

почему он не записывает результаты в сток?

1 Ответ

0 голосов
/ 27 сентября 2019

Здесь могут быть две проблемы.

Неправильный ввод даты

Я совершенно уверен, что проблема связана с F.to_timestamp('date'), который дает null из-за некорректного ввода.

Если это так, withWatermark('timestamp_date','1 second') никогда не может быть "материализован" и не приводит к выводу.

Не могли бы вы spark.read.format('delta').load('/mnt/sandbox/operations') (до read, а не readStream) и посмотреть, если преобразованиедает правильные значения?

spark.\
  read.\ 
  format('delta').\
  load('/mnt/sandbox/operations').\
  withColumn('timestamp_date',F.to_timestamp('date')).\
  show

Все строки используют одну и ту же метку времени

Также возможно, что withWatermark('timestamp_date','1 second') не завершается (и, таким образом, "завершает" агрегацию), потому что все строки изта же временная метка, чтобы время не сдвигалось.

У вас должны быть строки с разными временными метками, чтобы понятие времени для timestamp_date могло пройти за окном задержки '1 second'.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...