Недавно я начал работать с Apache Beam и облачным потоком данных Google для разработки больших данных.
перерабатывающий трубопровод. Я планирую использовать внутреннюю модель обработки состояния Beam ,
разработать мой конвейер обработки.
Ниже суть того, чего я хочу достичь
- Чтение данных из неограниченного (потокового) источника, для GCP это будет PubSub. Преобразовать данные в КВ
- Чтение данных из ограниченного источника, для GCP в моем случае это будет GCS. Преобразовать данные в КВ
- Поскольку один из источников не ограничен, я должен выполнить Window и Triggering. Я выбираю использовать Глобальное окно, так как я хочу объединить данные, поступающие из неограниченного источника, в ограниченный источник, если они уже существуют в моем внутреннем управлении состоянием.
- Объединение данных из PubSub и GCS по общему ключу.
- Добавить данные во внутреннее состояние Beam, используя Beam's BagState (StateId и StateSpec). Emit the Iterable данных, добавленных в BagState. Выровняйте итерируемое и запишите выбранное PCollection в GCS
- Выполнить некоторую функцию ParDo (функцию) для повторяемых данных. Записать испущенную PCollection в GCS
Ниже приведен пример кода
public static PipelineResult run(BeamStateFullProcessingPoC.Options options) {
// User - Think of it as Java POJO / Avro record / Protobuf message.
// create the pipeline
Pipeline pipeline = Pipeline.create(options);
/**
* Step - 1
* Read data from non-bounded (streaming) source, for GCP this would be PubSub.
* Transform the data into KV<String, Object>
*/
final PCollection<PubsubMessage> pubsubEvents = ...
final PCollection<KV<String, User>> pubSubUserByUserId = ...
/**
* Step - 2
* Read data from bounded source, for GCP my case this would be GCS.
* Transform the data into KV<String, Object>
*/
final PCollection<User> users = ...
final PCollection<KV<String, User>> gcsUserByUserId = ...
List<PCollection<KV<String, User>>> pCollectionList = new ArrayList<>();
pCollectionList.add(pubSubUserByUserId);
pCollectionList.add(gcsUserByUserId);
PCollection<KV<String, User>> allKVData = PCollectionList
.of(pCollectionList)
.apply("flatten KV ID with User", Flatten.pCollections());
/**
* Step - 3
* Perform Window + Triggering and GroupByKey
* As one of the Source is streaming, we need to do Window and trigger, before grouping by key
*/
final PCollection<KV<String, Iterable<User>>> iterableUserByIdKV = allKVData
.apply("batch data by window + trigger",
Window.<KV<String, User>> into(new GlobalWindows())
.triggering(AfterProcessingTime.pastFirstElementInPane())
.discardingFiredPanes())
.apply("GroupByKey per User", GroupByKey.create());
/**
* Step - 4
* Add User to Beam's internal state, using Beam's BagState (StateId and StateSpec)
* Emit the Iterable<User> added to BagState
* Flatten Iterable, and write the emitted PCollection to GCS
*/
final PCollection<Iterable<User>> iterableUser = iterableUserByIdKV
.apply("User added to State by Key", ParDo.of(new CreateInternalStateDoFn()));
final PCollection<User> userAddedToState = iterableUser
.apply("flatten userAddedToState", Flatten.iterables());
userAddedToState.apply("write userAddedToState", AvroIO.write(User.class)
.to(options.getOutputDirectoryForUserState())
.withSuffix(".avro")
.withWindowedWrites()
.withNumShards(options.getNumShards()));
/**
* Step - 5
* Perform some function via ParDo on Iterable<User>
* Write emitted data to GCS
*/
final PCollection<User> changeGenderUser = iterableUser
.apply("DetectChangeGenderDoFn", ParDo.of(new DetectChangeGenderDoFn()));
changeGenderUser.apply("write change gender", AvroIO.write(User.class)
.to(options.getOutputDirectoryForChangeGender())
.withSuffix(".avro")
.withWindowedWrites()
.withNumShards(options.getNumShards()));
return pipeline.run();
}
Ниже приведен список выплат JSON для создания задания шаблона потока данных.
{
"jobName": "poc-beam-state-management",
"parameters": {
"personSubscription": "projects/<project-name>/subscriptions/<subscription-name>",
"locationForUser": "gs://<bucket>/<user-folder>/*.avro",
"outputDirectoryForChangeGender": "gs://<bucket>/<folder>/",
"outputDirectoryForUserState": "gs://<bucket>/<folder>/",
"avroTempDirectory": "gs://<bucket>/<folder>/",
"numShards": "5",
"autoscalingAlgorithm": "THROUGHPUT_BASED",
"numWorkers": "3",
"maxNumWorkers": "18"
},
"environment": {
"subnetwork": "<some-subnet>",
"zone": "<some-zone>",
"serviceAccountEmail": "<some-service-account>",
},
"gcsPath": "gs://<bucket>/<folder>/templates/<TemplateName>"
}
Когда мое задание потока данных запускается, оно выполняет работу только на 1 рабочем узле.
Я предполагаю, что Google Cloud Platform Data Flow будет автоматически масштабировать работу с рабочими узлами по мере необходимости.
В: Как может поток данных работать, автоматически масштабировать и использовать возможности GCP для выполнения работы в распределенном режиме?
Задание потока данных DAG (подраздел-1)
DAG задания потока данных (подраздел-2)