Я пишу скрипт проверки данных в Apache Beam. Всякий раз, когда новый файл загружается в Google Cloud Storage, этот сценарий получает сообщение от PubSub, загружает файл и запускает набор заранее определенных тестов для файла.
В конце этих тестов мне нужно отправить по электронной почте журнал всех строк, которые не прошли их тесты.
Чтобы не отправлять электронное письмо несколько раз, я немного прочел и считаю, что могу отправить электронное письмо один раз с использованием конструкций состояния и таймера в Beam. Однако каждый файл будет иметь разное количество ошибок, поэтому как я могу установить его так, чтобы отправка файла ожидала X элементов, где каждый элемент является ошибкой, прежде чем отправит электронное письмо, а не жестко закодированный номер.
Я пытался использовать DoFn с COUNT_STATE для подсчета элементов, которые ему передаются, но я получаю другую ошибку, когда элемент представляет собой коллекцию Pcollection, а не кортеж K, V.
Вот код конвейера:
with beam.Pipeline(options=pipeline_options) as p:
# Read Lines from data
validation = (p
| "Read Element From PubSub" >> beam.io.ReadFromPubSub (topic=known_args.input_topic)
| 'Filter Messages' >> beam.ParDo(FilterMessageDoFn(known_args.project, t_options.dataset_id))
| 'After filter' >> beam.ParDo(DebugFn("DATA VALIDATION: PROCESSING FILE...", show_trace))
| 'Generate Schemas' >> beam.ParDo(GetSchemaFn(known_args.project, t_options.validation_home_path))
| 'After GetSschema' >> beam.ParDo(DebugFn("DATA VALIDATION: After OBTAINING SCHEMA...", show_trace))
| 'Validate' >> beam.ParDo(ValidateFn(known_args.project)).with_outputs(
ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_FAILURE,
ValidateFn.TAG_VALIDATION_CONTENT_SUCCESS,
main='lines')
to_be_joined = ([validation[ValidateFn.TAG_VALIDATION_GLOBAL_FAILURE],
validation[ValidateFn.TAG_VALIDATION_CONTENT_FAILURE]]
| "Group By Key" >> beam.Flatten()
| 'Persist Global Errors to Big Query' >> beam.ParDo(PersistErrorsFn(known_args.project))
| 'Debug Errors' >> beam.ParDo(DebugFn("DATA VALIDATION: VALIDATION ERRORS", show_trace))
| 'Save Global Errors' >> beam.io.WriteToBigQuery('data_management.validation_errors',
project=known_args.project,
schema=TABLE_SCHEMA,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Как правило, я хочу вставить шаг перед тем, как написать в BigQuery, чтобы отправить электронное письмо, которое отправляется только при получении VALIDATION_GLOBAL_FAILURE + VALIDATION_CONTENT_FAILURE количества ошибок.
Спасибо!