Как избежать дублирования данных при записи их в Big Query - PullRequest
0 голосов
/ 19 октября 2018

Я пытаюсь записать значения, которые я прочитал из Google Analytics и Google Cloud Storage, для записи в Google Big Query.Я пишу потоковое задание, которое записывает данные при получении из Pub / Sub.Моя главная проблема заключается в том, что я не мог выполнять операции Big Query в функции карты (что затрудняет получение информации о том, существует ли уже значение в большой таблице запросов)

Поток выглядит следующим образом:

Google Analytics -> PubSub -> функция обработки (MAP) -> операция Pcollection to Big Query (io.WritetoBigquery).

Обработка выполняет некоторую проверку значениймежду одним листом Excel и записями PubSub

Теперь я хотел удалить дублирующиеся значения, которые будут введены в таблицу больших запросов, которую я использовал.Я попытался дать id_label (но мне нужно дать атрибут, а не метку ключа).Я пытался получить count из большого запроса с помощью Map, но он не работал, так как не мог распознать bigquery.Client().Пожалуйста, найдите мой фрагмент кода ниже:

Трубопровод:

pipeline_options1 = PipelineOptions(pipeline_args)
pipeline_options1.view_as(SetupOptions).save_main_session = True
pipeline_options1.view_as(StandardOptions).streaming = True

p_bq = beam.Pipeline(options=pipeline_options1)

logging.info('Start')

"""Pipeline starts. Create creates a PCollection from what we read from Cloud storage"""
test = p_bq | beam.Create(data)

"""The pipeline then reads from pub sub and then combines the pub sub with the cloud storage data"""
BQ_data1 = p_bq | 'readFromPubSub' >> beam.io.ReadFromPubSub(
    'topic',id_label="hello") |  beam.Map(parse_pubsub, param=AsList(test))
BQ_data1 | beam.io.WriteToBigQuery(table='table', dataset='dataset', project='projectid')
"""Run the pipeline"""
result_bq = p_bq.run()

result_bq.wait_until_finish()

Моя функция подсчета (часть конвейера) :

client = bigquery.Client()
sql_string = """Select count(*) as cout from `dataset.table` where
           id='%s' """ % (insert_record['transactionID'])
query_job = client.query(sql_string)
results = query_job.result()

Но запрос не распознан.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...