Как передать поток Google Cloud Storage в Big Query с потоком данных без публикации / подписки - PullRequest
0 голосов
/ 14 февраля 2019

Я пытаюсь написать скрипт Python для потоковой передачи данных из моего хранилища Google Cloud Storage в Big Query с помощью конвейерной линии Dataflow.Я могу запустить задание, но оно выполняется как пакетное, а не потоковое, и нам не разрешено использовать Pub / Sub.

Ниже приведен код, который я пробую, с подробными сведениями, сделанными общими:

from __future__ import absolute_import

import argparse
import re
import logging
import apache_beam as beam
import json

from past.builtins import unicode
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions



# This class has all the functions which facilitate data transposition
class WordExtractingDoFn(beam.DoFn):
    def __init__(self):
        super(WordExtractingDoFn, self).__init__()

    # Create Bigquery Row
   dict function
       return
def run_bq(argv=None):
    parser = argparse.ArgumentParser()
    schema1 = your schema
    # All Command Line Arguments being added to the parser
    parser.add_argument(
        '--input', dest='input', required=False,
        default='gs://your-bucket-path/')

    parser.add_argument('--output', dest='output', required=False,
                        default='yourdataset.yourtable')
    known_args, pipeline_args = parser.parse_known_args(argv)
    pipeline_args.extend([
        '--runner=DataflowRunner',
        '--project=your-project',
        '--staging_location=gs://your-staging-bucket-path/',
        '--temp_location=gs://your-temp-bucket-path/',
        '--job_name=pubsubbql1',
        '--streaming'
    ])
    pushtobq = WordExtractingDoFn()

    # Pipeline Creation Begins
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    (p
     | 'Read from a File' >> beam.io.ReadFromText(known_args.input)
     | 'String To BigQuery Row' >> beam.Map(dict-file)
     | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                    known_args.output,
                    schema=schema2
                )
     )

    # Run Pipeline
    p.run().wait_until_finish()


# Main Method to call
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run_bq()

С помощью приведенного выше кода я могу создавать задания, но они являются пакетными заданиями, мой основной мотив - брать данные из сегментов в формате json, и мне нужно вставить их в BigQuery.

...