Как запустить луч Apache для записи в большой запрос на основе условий - PullRequest
0 голосов
/ 18 октября 2018

Я пытаюсь прочитать значения из паба Google и хранилища Google и поместить эти значения в большой запрос на основе условий подсчета, т. Е. Если значения существуют, то не следует вставлять значение, иначе можно вставить значение.

Мой код выглядит следующим образом:

p_bq = beam.Pipeline(options=pipeline_options1)

logging.info('Start')

"""Pipeline starts. Create creates a PCollection from what we read from Cloud storage"""
test = p_bq | beam.Create(data)

"""The pipeline then reads from pub sub and then combines the pub sub with the cloud storage data"""
BQ_data1 = p_bq | 'readFromPubSub' >> beam.io.ReadFromPubSub(
    'mytopic') |  beam.Map(parse_pubsub, param=AsList(test))

где 'data' - это значение из хранилища Google, а чтение из pubsub - это значение из Google Analytics.Parse_pubsub возвращает 2 значения: одно - это словарь, а другое - количество (в котором указано, присутствует или нет значение в таблице)

count=comparebigquery(insert_record)
return (insert_record,count)

Как обеспечить условие для вставки большого запроса, поскольку значение находится в Pcollection


Новое редактирование:

class Process(beam.DoFn):

def process1(self, element, trans):
    if element['id'] in trans:
        # Emit this short word to the main output.
        yield pvalue.TaggedOutput('present',element)
    else:
        # Emit this word's long length to the 'above_cutoff_lengths' output.
        yield pvalue.TaggedOutput(
            'absent', present)

test1 = p_bq | "TransProcess" >> beam.Create(trans)

, где trans - список

BQ_data2 = BQ_data1 | beam.ParDo(Process(),trans=AsList(test1)).with_outputs('present','absent')
present_value=BQ_data2.present
absent_value=BQ_data2.absent

Заранее спасибо

Ответы [ 2 ]

0 голосов
/ 20 октября 2018

Вы можете разделить PCollection в функции ParDo , используя Дополнительные выходы в зависимости от условия.

Не забудьте предоставить выходные теги для функции ParDo .with_outputs()

И при записи элементов PCollection в определенный вывод, используйте .TaggedOutput()

Затем выберите PCollection , который вам нужен, и запишите его в BigQuery.

0 голосов
/ 18 октября 2018

Вы можете использовать

beam.Filter(lambda_function)

после луча. Шаг карты отфильтровывает элементы, которые возвращают False при передаче в lambda_function .

...