У меня есть конвейер чтения данных из событий PubSub. При получении сообщения я делаю шаг преобразования, чтобы подогнать данные события к желаемой схеме BigQuery. Однако, если мой созданный ввод не соответствует схеме, я сталкиваюсь с проблемами. Видимо, бесконечно повторяется попытка написать в BigQuery:
Count: 76 RuntimeError: Could not successfully insert rows to BigQuery table
В настоящее время я делаю много ручной проверки того, что входные данные соответствуют схеме, однако в случаях, которые я не рассматривал, я накапливаю RuntimeErrors. Есть ли способ попробовать написать в BigQuery, и в случае неудачи сделать что-то еще с исходным вводом? В качестве альтернативы, есть ли способ попробовать написать несколько раз, а в противном случае молча завершиться с ошибкой без добавления новых RuntimeErrors?
Редактировать: Я использую Python SDK. Вот мой упрощенный конвейер для дальнейшего разъяснения:
with beam.Pipeline(options=options) as pipeline:
# Read messages from PubSub
event = (pipeline
| 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))
output = (event
| 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))
# Write to Big Query
_ = (output
| 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table,
dataset=dataset,
project=project,
schema=schema,
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
Если в моей таблице нет столбца 'input', задание умирает. Посмотрев в https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279 кажется, в этом причина такого поведения. Настроив https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187 и не вызывая RuntimeError, я могу преодолеть свою проблему, однако это кажется довольно громоздким. У кого-нибудь есть предложения по более простому пути?