Я пытаюсь записать значения, которые я прочитал из Google Analytics и Google Cloud Storage, для записи в Google Big Query.Я пишу потоковое задание, которое записывает данные при получении из Pub / Sub.Моя главная проблема заключается в том, что я не мог выполнять операции Big Query в функции карты (что затрудняет получение информации о том, существует ли уже значение в большой таблице запросов)
Поток выглядит следующим образом:
Google Analytics -> PubSub -> функция обработки (MAP) -> операция Pcollection to Big Query (io.WritetoBigquery).
Обработка выполняет некоторую проверку значениймежду одним листом Excel и записями PubSub
Теперь я хотел удалить дублирующиеся значения, которые будут введены в таблицу больших запросов, которую я использовал.Я попытался дать id_label
(но мне нужно дать атрибут, а не метку ключа).Я пытался получить count
из большого запроса с помощью Map, но он не работал, так как не мог распознать bigquery.Client()
.Пожалуйста, найдите мой фрагмент кода ниже:
Трубопровод:
pipeline_options1 = PipelineOptions(pipeline_args)
pipeline_options1.view_as(SetupOptions).save_main_session = True
pipeline_options1.view_as(StandardOptions).streaming = True
p_bq = beam.Pipeline(options=pipeline_options1)
logging.info('Start')
"""Pipeline starts. Create creates a PCollection from what we read from Cloud storage"""
test = p_bq | beam.Create(data)
"""The pipeline then reads from pub sub and then combines the pub sub with the cloud storage data"""
BQ_data1 = p_bq | 'readFromPubSub' >> beam.io.ReadFromPubSub(
'topic',id_label="hello") | beam.Map(parse_pubsub, param=AsList(test))
BQ_data1 | beam.io.WriteToBigQuery(table='table', dataset='dataset', project='projectid')
"""Run the pipeline"""
result_bq = p_bq.run()
result_bq.wait_until_finish()
Моя функция подсчета (часть конвейера) :
client = bigquery.Client()
sql_string = """Select count(*) as cout from `dataset.table` where
id='%s' """ % (insert_record['transactionID'])
query_job = client.query(sql_string)
results = query_job.result()
Но запрос не распознан.