Захват ошибок при записи в BigQuery в конвейере данных - PullRequest
0 голосов
/ 08 мая 2018

У меня есть конвейер чтения данных из событий PubSub. При получении сообщения я делаю шаг преобразования, чтобы подогнать данные события к желаемой схеме BigQuery. Однако, если мой созданный ввод не соответствует схеме, я сталкиваюсь с проблемами. Видимо, бесконечно повторяется попытка написать в BigQuery:

Count: 76   RuntimeError: Could not successfully insert rows to BigQuery table

В настоящее время я делаю много ручной проверки того, что входные данные соответствуют схеме, однако в случаях, которые я не рассматривал, я накапливаю RuntimeErrors. Есть ли способ попробовать написать в BigQuery, и в случае неудачи сделать что-то еще с исходным вводом? В качестве альтернативы, есть ли способ попробовать написать несколько раз, а в противном случае молча завершиться с ошибкой без добавления новых RuntimeErrors?

Редактировать: Я использую Python SDK. Вот мой упрощенный конвейер для дальнейшего разъяснения:

with beam.Pipeline(options=options) as pipeline:
    # Read messages from PubSub
    event = (pipeline
             | 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))

    output = (event
              | 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))

    # Write to Big Query
    _ = (output
         | 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
             table=table,
             dataset=dataset,
             project=project,
             schema=schema,
             create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
             write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))

Если в моей таблице нет столбца 'input', задание умирает. Посмотрев в https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279 кажется, в этом причина такого поведения. Настроив https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187 и не вызывая RuntimeError, я могу преодолеть свою проблему, однако это кажется довольно громоздким. У кого-нибудь есть предложения по более простому пути?

Ответы [ 2 ]

0 голосов
/ 16 мая 2018

Beam - Python SDK для потоковой передачи довольно ограничен.

https://beam.apache.org/documentation/sdks/python-streaming/

Экспериментальное выполнение потокового конвейера Python (с некоторыми ограничениями) доступно начиная с Beam SDK версии 2.5.0.

Потоковое выполнение Python в настоящее время не поддерживает следующие функции.

Основные характеристики луча : Эти неподдерживаемые функции Beam распространяются на всех участников.

  • API состояния и таймеров
  • Пользовательский API-интерфейс
  • Splittable DoFn API
  • Обработка поздних данных
  • Пользовательский пользовательский WindowFn

Особенности DataflowRunner : Кроме того, DataflowRunner в настоящее время не поддерживает следующие специфические функции облачного потока данных при выполнении потоковой передачи Python.

  • Потоковое автомасштабирование
  • Обновление существующих конвейеров
  • Облачные шаблоны потоков данных
  • Некоторые функции мониторинга, такие как счетчики мс, отображаемые данные, метрики и количество элементов для преобразований. Однако ведение журнала, водяные знаки и количество элементов для источников поддерживаются.

Больше информации здесь: https://beam.apache.org/documentation/sdks/python-streaming/#unsupported-features

Также проверьте примечания к выпуску ниже в документах DataFlow: Python Dataflow Streaming limitation

0 голосов
/ 08 мая 2018

Если вы написали сам конвейер, вы можете использовать setFailedInsertRetryPolicy в BigQueryIO для InsertRetryPolicy.neverRetry

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...