Поток данных: Как создать конвейер из уже существующей PCollection, извергнутой другим конвейером - PullRequest
0 голосов
/ 08 июня 2018

Я пытаюсь разделить мой конвейер на множество меньших конвейеров, чтобы они выполнялись быстрее.Я делю PCollection BLOB-объектов Google Cloud Storage (PCollection) так, чтобы я получил оттуда

    PCollectionList<Blob> collectionList

, и я хотел бы получить что-то вроде:

    Pipeline p2 = Pipeline.create(collectionList.get(0));
    .apply(stuff)
    .apply(stuff)

    Pipeline p3 = Pipeline.create(collectionList.get(1));
    .apply(stuff)
    .apply(stuff)

Но яЯ не нашел никакой документации о создании начальной PCollection из уже существующей PCollection, я был бы очень признателен, если кто-нибудь может указать мне правильное направление.Спасибо!

1 Ответ

0 голосов
/ 12 июня 2018

Вы должны посмотреть на преобразование Partition, чтобы разбить PCollection на N меньших.Вы можете предоставить PartitionFn, чтобы определить, как выполняется разделение.Ниже приведен пример из руководства по программированию Beam :

// Provide an int value with the desired number of result partitions, and a PartitionFn that represents the partitioning function.
// In this example, we define the PartitionFn in-line.
// Returns a PCollectionList containing each of the resulting partitions as individual PCollection objects.
PCollection<Student> students = ...;
// Split students up into 10 partitions, by percentile:
PCollectionList<Student> studentsByPercentile =
    students.apply(Partition.of(10, new PartitionFn<Student>() {
        public int partitionFor(Student student, int numPartitions) {
            return student.getPercentile()  // 0..99
                 * numPartitions / 100;
        }}));

// You can extract each partition from the PCollectionList using the get method, as follows:
PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
...