Как динамически отслеживать состояние в Apache Beam? - PullRequest
0 голосов
/ 22 мая 2019

Я пишу скрипт проверки данных в Apache Beam. Всякий раз, когда новый файл загружается в Google Cloud Storage, этот сценарий получает сообщение от PubSub, загружает файл и запускает набор заранее определенных тестов для файла. В конце этих тестов мне нужно отправить по электронной почте журнал всех строк, которые не прошли их тесты.

Чтобы не отправлять электронное письмо несколько раз, я немного прочел и считаю, что могу отправить электронное письмо один раз с использованием конструкций состояния и таймера в Beam. Однако каждый файл будет иметь разное количество ошибок, поэтому как я могу установить его так, чтобы отправка файла ожидала X элементов, где каждый элемент является ошибкой, прежде чем отправит электронное письмо, а не жестко закодированный номер.

Я пытался использовать DoFn с COUNT_STATE для подсчета элементов, которые ему передаются, но я получаю другую ошибку, когда элемент представляет собой коллекцию Pcollection, а не кортеж K, V.

Вот код конвейера:

with beam.Pipeline(options=pipeline_options) as p:
    # Read Lines from data
    validation = (p
                | "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
                | 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
                | 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
                | 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
                | 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
                | 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
                ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
                ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
                ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
                main='lines')

to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE], 
                validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
               | "Group By Key" >> beam.Flatten()
               | 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
               | 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
               | 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',                                                                 
              project=known_args.project,
              schema=TABLE_SCHEMA, 
              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)

Как правило, я хочу вставить шаг перед тем, как написать в BigQuery, чтобы отправить электронное письмо, которое отправляется только при получении VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE количества ошибок.

Спасибо!

1 Ответ

1 голос
/ 23 мая 2019

Идея состоит в том, что вы хотите выполнить CoGroupByKey для двух PCollections, содержащих ошибки проверки, а затем применить DoFn, который применяет логику отправки электронной почты к результату.

ЭтоНепонятно, что это за типы в конвейере, но я собираюсь предположить, что ValidateFn выводит кортеж (file name, validation error) в ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE и ValidateFn.TAG_VALIDATION_CONTENT_FAILURE.

class SendEmail(beam.DoFn):
  def process(self, element):
    file_name = element[0]
    iterable_of_global_failures = element[1].get(ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE)
    iterable_of_content_failures = element[1].get(ValidateFn.TAG_VALIDATION_CONTENT_FAILURE)
    ... format and send e-mail if iterables satisfy requirements ...


# create a dict containing the tag to PCollection mapping for what we want to group together.
validation = (p
              | "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
              | 'WindowInto' >> beam.WindowInto(FixedWindows(1))
              | ...

validation_errors = {key: validation[key] for key in [ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE, ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]}

(validation_errors
 | 'CoGroupByKey' >> beam.CoGroupByKey()
 | 'Send Email' >> beam.ParDo(SendEmail())

Поскольку каждая входная запись из PubsubIO представляетимя файла, а затем оно расширяется во все соответствующие записи; все эти записи будут иметь одинаковую метку времени сообщения PubsubIO, частью которого является файл.Это позволяет нам использовать действительно маленький размер окна при группировании, что приводит к меньшим группам и лучшей производительности.Указание WindowInto необходимо, чтобы мы не использовали GlobalWindow, потому что CoGroupByKey никогда не будет запускать вывод.Вы можете узнать больше о потоковой передаче, управлении окнами и запуске [1 , 2] .

...