Я пытаюсь создать задание потока данных в python для записи данных из pubsub в Bigquery, код работает нормально, но для обработки ошибок и загрузки их в новую таблицу bigquery у меня возникли трудности, можете ли вы предложить способ обработкиошибки в функции запуска и загрузки исходного сообщения в новую таблицу.
Эта функция запускает конвейер потока данных и загружает в таблицу больших запросов
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', dest='input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
parser.add_argument(
'--output_table', dest='output_table', required=True,
help='Input the table name for bigquery".')
parser.add_argument(
'--output_dataset', dest='output_dataset', required=True,
help='Input the dataset name for bigquery".')
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read from PubSub Topic
lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
#Adapt messages from PubSub to BQ table, this needs to be in JSON
lines = lines | beam.Map(parse_pubsub)
#Write to a BQ table
lines | beam.io.WriteToBigQuery(table=known_args.output_table,
dataset=known_args.output_dataset,
project='test-project',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()