Я пытаюсь прочитать значения из паба Google и хранилища Google и поместить эти значения в большой запрос на основе условий подсчета, т. Е. Если значения существуют, то не следует вставлять значение, иначе можно вставить значение.
Мой код выглядит следующим образом:
p_bq = beam.Pipeline(options=pipeline_options1)
logging.info('Start')
"""Pipeline starts. Create creates a PCollection from what we read from Cloud storage"""
test = p_bq | beam.Create(data)
"""The pipeline then reads from pub sub and then combines the pub sub with the cloud storage data"""
BQ_data1 = p_bq | 'readFromPubSub' >> beam.io.ReadFromPubSub(
'mytopic') | beam.Map(parse_pubsub, param=AsList(test))
где 'data' - это значение из хранилища Google, а чтение из pubsub - это значение из Google Analytics.Parse_pubsub возвращает 2 значения: одно - это словарь, а другое - количество (в котором указано, присутствует или нет значение в таблице)
count=comparebigquery(insert_record)
return (insert_record,count)
Как обеспечить условие для вставки большого запроса, поскольку значение находится в Pcollection
Новое редактирование:
class Process(beam.DoFn):
def process1(self, element, trans):
if element['id'] in trans:
# Emit this short word to the main output.
yield pvalue.TaggedOutput('present',element)
else:
# Emit this word's long length to the 'above_cutoff_lengths' output.
yield pvalue.TaggedOutput(
'absent', present)
test1 = p_bq | "TransProcess" >> beam.Create(trans)
, где trans - список
BQ_data2 = BQ_data1 | beam.ParDo(Process(),trans=AsList(test1)).with_outputs('present','absent')
present_value=BQ_data2.present
absent_value=BQ_data2.absent
Заранее спасибо