Невозможно динамически запустить конвейер с несколькими потоками (конвейер от N до N) (с использованием провайдера значений времени выполнения) в одном задании потока данных в Python - PullRequest
0 голосов
/ 17 июня 2019

Я пытаюсь запустить потоковое задание потока данных, содержащее n конвейеров.

На основе настроенной темы и соответствующей таблицы BQ для каждой темы. Я хочу запустить конвейер внутри одного потокового задания.

Моя настоящая проблема заключается в том, что мне нужно создать и загрузить шаблон для каждой икаждый проект.Что я хочу, так это то, что я могу повторно использовать загруженный шаблон и только файлы конфигурации, которые я должен передать для запуска нового задания потока данных, изменив тему, подписку, набор данных и таблицу bq.

Что я не могу повторно использовать шаблон.

Пожалуйста, помогите мне в этом и дайте мне знать, если это возможно или нет.Потому что Google также предоставил один к одному шаблону.Шаблон «не много ко многим» (например, «Три темы» - «Три таблицы BQ» (три конвейера данных), nn).

import logging
import os
import json
from google.cloud import storage
from apache_beam import Pipeline, ParDo, DoFn
from apache_beam.io import ReadFromPubSub, WriteToBigQuery, BigQueryDisposition
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, WorkerOptions, GoogleCloudOptions, \
    SetupOptions


def _get_storage_service():
    storage_client = storage.Client \
        .from_service_account_json(
        json_credentials_path='C:\Users\dneema\PycharmProjects\iot_dataflow\df_stm_iot_pubsub_bq\service_account_credentials.json')
    print('storage service fetched')
    return storage_client


class RuntimeOptions(PipelineOptions):

    def __init__(self, flags=None, **kwargs):
        super(RuntimeOptions, self).__init__(flags, **kwargs)

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--bucket_name', type=str)
        parser.add_value_provider_argument('--config_json_path', type=str,)


class PipelineCreator:

    def __init__(self):
        self.options = PipelineOptions()
        storage_client = storage.Client.from_service_account_json(
            'service_account_credentials_updated.json')

        runtime_options = self.options.view_as(RuntimeOptions)
        bucket_name = str(runtime_options.bucket_name)
        config_json_path = str(runtime_options.config_json_path)

        # get the bucket with name
        bucket = storage_client.get_bucket(bucket_name)

        # get bucket file as blob
        blob = bucket.get_blob(config_json_path)

        # convert to string and load config
        json_data = blob.download_as_string()
        self.configData = json.loads(json_data)

        dataflow_config = self.configData['dataflow_config']
        self.options.view_as(StandardOptions).streaming = bool(dataflow_config['streaming'])
        self.options.view_as(SetupOptions).save_main_session = True

        worker_options = self.options.view_as(WorkerOptions)
        worker_options.max_num_workers = int(dataflow_config['max_num_worker'])
        worker_options.autoscaling_algorithm = str(dataflow_config['autoscaling_algorithm'])
        #worker_options.machine_type = str(dataflow_config['machine_type'])
        #worker_options.zone = str(dataflow_config['zone'])
        #worker_options.network = str(dataflow_config['network'])
        #worker_options.subnetwork = str(dataflow_config['subnetwork'])

    def run(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'dataflow-service-account.json'

        project_id = self.configData['project_id']
        dataset_id = self.configData['dataset_id']
        topics = self.configData['topics']
        table_ids = self.configData['bq_table_ids']
        error_table_id = self.configData['error_table_id']

        logger = logging.getLogger(project_id)
        logger.info(self.options.display_data())

        pipeline = Pipeline(options=self.options)

        size = len(topics)
        for index in range(size):
            print(topics[index])
            pipeline_name = "pipeline_"+str(index)
            logger.info("Launch pipeline :: "+pipeline_name)
            messages = pipeline | 'Read PubSub Message in ' + pipeline_name >> ReadFromPubSub(topic=topics[index])
            logger.info("Read PubSub Message")
            valid_messages, invalid_messages = messages  | 'Convert Messages to TableRows in ' + pipeline_name >> ParDo(TransformMessageToTableRow()).with_outputs('invalid', main='valid')
            valid_messages | 'Write Messages to BigQuery in ' + pipeline_name >> WriteToBigQuery(table=table_ids[index],
                                                                                               dataset=dataset_id,
                                                                                               project=project_id,
                                                                                          write_disposition=BigQueryDisposition.WRITE_APPEND)

        pipeline.run().wait_until_finish()

class TransformMessageToTableRow(DoFn):

    def process(self, element, *args, **kwargs):
        logging.getLogger('dataflow').log(logging.INFO, element)
        print element
        print("element type ", type(element))
        print("inside bq pardo")
        import json
        try:
            message_rows = json.loads(element)

            # if using emulator, uncomment below line
            message_rows = json.loads(message_rows)
            print 'loaded element'
        except:
            try:
                element = "[" + element + "]"
                message_rows = json.loads(element)
            except Exception as e:
                print(e)
                from apache_beam import pvalue
                yield [pvalue.TaggedOutput('invalid', [element, str(e)])]
        print(message_rows)
        print("message rows", type(message_rows))
        if not isinstance(message_rows, list):
            message_rows = [message_rows]
        #rows = list()
        if isinstance(message_rows, list):

            for row in message_rows:
                try:
                    new_row = dict()
                    for k, v in row.items():
                        new_row[str(k)] = v
                    #rows.append(new_row)
                    print(new_row)
                    yield new_row
                except Exception as e:
                    print(e)
                    from apache_beam import pvalue
                    yield pvalue.TaggedOutput('invalid', [row, str(e)])

if __name__ == '__main__':
        PipelineCreator().run()

Здесь аргумент времени выполнения в виде bucket_name и config_json_path для всех связанных с конфигурацией материалов, таких как набор данных, таблица BQ, разделы / подписка и все параметры рабочего процесса.

Это возможно или нет?Потому что Google также предоставил один к одному шаблону.Шаблон «Не много ко многим» (например, «Три темы» - «Три таблицы BQ» (три конвейера данных), nn).

...