Объединение ограниченного и неограниченного источника, задание потока данных не масштабируется - PullRequest
1 голос
/ 19 марта 2019

Недавно я начал работать с Apache Beam и облачным потоком данных Google для разработки больших данных. перерабатывающий трубопровод. Я планирую использовать внутреннюю модель обработки состояния Beam , разработать мой конвейер обработки.

Ниже суть того, чего я хочу достичь

  • Чтение данных из неограниченного (потокового) источника, для GCP это будет PubSub. Преобразовать данные в КВ
  • Чтение данных из ограниченного источника, для GCP в моем случае это будет GCS. Преобразовать данные в КВ
  • Поскольку один из источников не ограничен, я должен выполнить Window и Triggering. Я выбираю использовать Глобальное окно, так как я хочу объединить данные, поступающие из неограниченного источника, в ограниченный источник, если они уже существуют в моем внутреннем управлении состоянием.
  • Объединение данных из PubSub и GCS по общему ключу.
  • Добавить данные во внутреннее состояние Beam, используя Beam's BagState (StateId и StateSpec). Emit the Iterable данных, добавленных в BagState. Выровняйте итерируемое и запишите выбранное PCollection в GCS
  • Выполнить некоторую функцию ParDo (функцию) для повторяемых данных. Записать испущенную PCollection в GCS

Ниже приведен пример кода

public static PipelineResult run(BeamStateFullProcessingPoC.Options options) {

    // User - Think of it as Java POJO / Avro record / Protobuf message.

    // create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /**
    *   Step - 1
    *   Read data from non-bounded (streaming) source, for GCP this would be PubSub.  
    *   Transform the data into KV<String, Object>
    */
    final PCollection<PubsubMessage> pubsubEvents = ... 
    final PCollection<KV<String, User>> pubSubUserByUserId = ...

    /**
    *   Step - 2
    *   Read data from bounded source, for GCP my case this would be GCS. 
    *   Transform the data into KV<String, Object>
    */
    final PCollection<User> users = ...
    final PCollection<KV<String, User>> gcsUserByUserId = ...

    List<PCollection<KV<String, User>>> pCollectionList = new ArrayList<>();
    pCollectionList.add(pubSubUserByUserId);
    pCollectionList.add(gcsUserByUserId);

    PCollection<KV<String, User>> allKVData = PCollectionList
                  .of(pCollectionList)
                  .apply("flatten KV ID with User", Flatten.pCollections());


    /**
    *   Step - 3 
    *   Perform Window + Triggering and GroupByKey
    *   As one of the Source is streaming, we need to do Window and trigger, before grouping by key
    */
    final PCollection<KV<String, Iterable<User>>> iterableUserByIdKV = allKVData
            .apply("batch data by window + trigger",
                    Window.<KV<String, User>> into(new GlobalWindows())
                    .triggering(AfterProcessingTime.pastFirstElementInPane())
                    .discardingFiredPanes())
    .apply("GroupByKey per User", GroupByKey.create());

    /**
    *   Step - 4
    *   Add User to Beam's internal state, using Beam's BagState (StateId and StateSpec)
    *   Emit the Iterable<User> added to BagState
    *   Flatten Iterable, and write the emitted PCollection to GCS
    */    
    final PCollection<Iterable<User>> iterableUser = iterableUserByIdKV
            .apply("User added to State by Key", ParDo.of(new CreateInternalStateDoFn()));

    final PCollection<User> userAddedToState = iterableUser
            .apply("flatten userAddedToState", Flatten.iterables());
    userAddedToState.apply("write userAddedToState", AvroIO.write(User.class)
            .to(options.getOutputDirectoryForUserState())
            .withSuffix(".avro")
            .withWindowedWrites()
            .withNumShards(options.getNumShards()));


    /**
    *   Step - 5
    *   Perform some function via ParDo on Iterable<User> 
    *   Write emitted data to GCS
    */
    final PCollection<User> changeGenderUser = iterableUser
            .apply("DetectChangeGenderDoFn", ParDo.of(new DetectChangeGenderDoFn()));

    changeGenderUser.apply("write change gender", AvroIO.write(User.class)
            .to(options.getOutputDirectoryForChangeGender())
            .withSuffix(".avro")
            .withWindowedWrites()
            .withNumShards(options.getNumShards()));

    return pipeline.run();
}

Ниже приведен список выплат JSON для создания задания шаблона потока данных.

{
  "jobName": "poc-beam-state-management",
  "parameters": {
    "personSubscription": "projects/<project-name>/subscriptions/<subscription-name>",
    "locationForUser": "gs://<bucket>/<user-folder>/*.avro",
    "outputDirectoryForChangeGender": "gs://<bucket>/<folder>/",
    "outputDirectoryForUserState": "gs://<bucket>/<folder>/",
    "avroTempDirectory": "gs://<bucket>/<folder>/",
    "numShards": "5",
    "autoscalingAlgorithm": "THROUGHPUT_BASED",
    "numWorkers": "3",
    "maxNumWorkers": "18"    
  },
  "environment": {
    "subnetwork": "<some-subnet>",
    "zone": "<some-zone>",
    "serviceAccountEmail": "<some-service-account>",
  },
  "gcsPath": "gs://<bucket>/<folder>/templates/<TemplateName>"
}

Когда мое задание потока данных запускается, оно выполняет работу только на 1 рабочем узле.
Я предполагаю, что Google Cloud Platform Data Flow будет автоматически масштабировать работу с рабочими узлами по мере необходимости.

В: Как может поток данных работать, автоматически масштабировать и использовать возможности GCP для выполнения работы в распределенном режиме?

Задание потока данных DAG (подраздел-1) DAG задания потока данных (подраздел-2)

1 Ответ

0 голосов
/ 25 марта 2019

Итак, ваше базовое предположение в приведенном выше списке имеет некоторые недостатки.Вы утверждаете, что Поскольку один из источников не ограничен ... Я выбираю использовать Глобальное окно, так как хочу объединить данные, поступающие из неограниченного источника, в ограниченный источник ... подчеркнултекст

В Beam вы не можете выполнить глобальное окно для бесконечного потока, поскольку вы не можете поместить поток в память.Вам необходимо разместить их в фиксированных окнах, которые вы можете прочитать здесь .Из-за глобального окна задание никогда не будет завершено.

Во-вторых, если вы запускаете потоковое задание потока данных, то по умолчанию в Google задано значение autoscalingAlgorithm=NONE.Вы хотели бы указать это autoscalingAlgorithm=THROUGHPUT_BASED.Вы можете найти детали здесь , которые объясняют это лучше.Это предотвращает автоматическое масштабирование вашей машины.

Надеюсь, что это ответ на ваш вопрос.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...