Запись данных об ошибках в новую таблицу Bigquery для потока данных GCP в python - PullRequest
0 голосов
/ 17 октября 2019

Я пытаюсь создать задание потока данных в python для записи данных из pubsub в Bigquery, код работает нормально, но для обработки ошибок и загрузки их в новую таблицу bigquery у меня возникли трудности, можете ли вы предложить способ обработкиошибки в функции запуска и загрузки исходного сообщения в новую таблицу.

Эта функция запускает конвейер потока данных и загружает в таблицу больших запросов

  def run(argv=None):      
"""Build and run the pipeline."""

      parser = argparse.ArgumentParser()
      parser.add_argument(
          '--input_topic', dest='input_topic', required=True, 
          help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
      parser.add_argument(
          '--output_table', dest='output_table', required=True, 
          help='Input the table name for bigquery".')
      parser.add_argument(
          '--output_dataset', dest='output_dataset', required=True, 
          help='Input the dataset name for bigquery".') 

      known_args, pipeline_args = parser.parse_known_args(argv)

      with beam.Pipeline(argv=pipeline_args) as p:
        # Read from PubSub Topic 
        lines = p | beam.io.ReadFromPubSub(known_args.input_topic)
        #Adapt messages from PubSub to BQ table, this needs to be in JSON 
        lines = lines | beam.Map(parse_pubsub)
        #Write to a BQ table 
        lines | beam.io.WriteToBigQuery(table=known_args.output_table,
                                        dataset=known_args.output_dataset,
                                        project='test-project',
                                        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
                                        )



    if __name__ == '__main__':
      logging.getLogger().setLevel(logging.INFO)
      run()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...