Условное утверждение Python Apache Beam конвейер - PullRequest
0 голосов
/ 13 ноября 2018

Текущая ситуация

Типичным объектом этого конвейера является чтение из pub / sub полезной нагрузки с геоданными, затем эти данные преобразуются, анализируются и, наконец, возвращаются, если условие истинно или ложно

 with beam.Pipeline(options=pipeline_options) as p:
        raw_data = (p
                    | 'Read from PubSub' >> beam.io.ReadFromPubSub(
                    subscription='projects/XXX/subscriptions/YYY'))

        geo_data = (raw_data
                    | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s)))



def GeoDataIngestion(string_input):
    <...>
    return True or False

Желаемая ситуация 1

Если результат GeoDataIngestion равен true, тогда raw_data будет сохранен в большом запросе

geo_data = (raw_data
                | 'Geo data transform' >> beam.Map(lambda s: GeoDataIngestion(s))
                | 'Evaluate condition' >> beam.Map(lambda s: Condition(s))
                )

def Condition(condition):
    if condition:
        <...WriteToBigQuery...>


#The class I used before to store raw_data without depending on evaluate condition:

class WriteToBigQuery(beam.PTransform):
    def expand(self, pcoll):
        return (
                pcoll
                | 'Format' >> beam.ParDo(FormatBigQueryFn())
                | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            'XXX',
            schema=TABLE_SCHEMA,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

Желаемая ситуация 2

Вместо того, чтобы хранить данные в BigQuery, было бы также хорошо отправить в pub / sub

def Condition(condition):
    if condition:
        <...SendToPubSub(Topic1)...>
    else:
        <...SendToPubSub(Topic2)...>

Здесь проблема состоит в том, чтобы установить тему в зависимости от результата условия, потому что я не могу передать тему как параметр в конвейере

 | beam.io.WriteStringsToPubSub(TOPIC)

Ни в одной функции / классе

Вопрос

Как я могу это сделать?

Как / где я должен вызвать WriteToBigQuery для хранения необработанных данных PCollection, если результат условия Evaluate равен true?

1 Ответ

0 голосов
/ 18 ноября 2018

Я думаю, что ветвление коллекций на основе результата условия оценки может быть полезным для вашего сценария. Пожалуйста, смотрите документацию здесь .

Чтобы проиллюстрировать разветвление, предположим, что у меня есть коллекция ниже, где вы хотите выполнить другое действие в зависимости от содержимого строки.

'this line is for BigQuery',
'this line for pubsub topic1',
'this line for pubsub topic2'

Приведенный ниже код создаст тег для коллекции, и вы можете получить три разных PCollections на основе тега. Затем вы можете решить, какие дальнейшие действия вы хотите выполнить с отдельными коллекциями.

import apache_beam as beam
from apache_beam import pvalue
import sys

class Split(beam.DoFn):

    # These tags will be used to tag the outputs of this DoFn.
    OUTPUT_TAG_BQ = 'BigQuery'
    OUTPUT_TAG_PS1 = 'pubsub topic1'
    OUTPUT_TAG_PS2 = 'pubsub topic2'

    def process(self, element):
        """
        tags the input as it processes the orignal PCollection
        """
        print element
        if "BigQuery" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_BQ, element)
            print 'found bq'
        elif "pubsub topic1" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS1, element)
        elif "pubsub topic2" in element:
            yield pvalue.TaggedOutput(self.OUTPUT_TAG_PS2, element)


if __name__ == '__main__':
    output_prefix = 'C:\\pythonVirtual\\Mycodes\\output'
    p = beam.Pipeline(argv=sys.argv)
    lines = (p
            | beam.Create([
               'this line is for BigQuery',
               'this line for pubsub topic1',
               'this line for pubsub topic2']))

    # with_outputs allows accessing the explicitly tagged outputs of a DoFn.
    tagged_lines_result = (lines
                          | beam.ParDo(Split()).with_outputs(
                              Split.OUTPUT_TAG_BQ,
                              Split.OUTPUT_TAG_PS1,
                              Split.OUTPUT_TAG_PS2))

    # tagged_lines_result is an object of type DoOutputsTuple. It supports
    # accessing result in alternative ways.
    bq_records = tagged_lines_result[Split.OUTPUT_TAG_BQ]| "write BQ" >> beam.io.WriteToText(output_prefix + 'bq')
    ps1_records = tagged_lines_result[Split.OUTPUT_TAG_PS1] | "write PS1" >> beam.io.WriteToText(output_prefix + 'ps1')
    ps2_records = tagged_lines_result[Split.OUTPUT_TAG_PS2] | "write PS2" >> beam.io.WriteToText(output_prefix + 'ps2')

    p.run().wait_until_finish()

Пожалуйста, дайте мне знать, если это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...