Как удалить вложенную коллекцию PC в потоке данных - PullRequest
0 голосов
/ 05 апреля 2019

Чтобы объединить две вложенные структуры PCollection, нам нужно отсоединить PCollection перед выполнением объединения, чтобы получить вызовы (см. Мой другой случай стекопотока ссылка ). Так что хочу знать, как отменить сбор PCollection. Было бы хорошо, если бы кто-то дал идею, или Присоединитесь к двум вложенным таблицам, или как раскрутить PCollections.

Я только что заметил, что у нас есть PTransform "Unnest" ( link ) для удаления коллекции из вложенной. Но я не мог найти образец в сети. Однако я просто попытался реализовать его с помощью следующих шагов для преобразования вложенной коллекции, но все еще не смог получить последнюю коллекцию в последней.

1) PCollection empCollection = ReadCollection (); 2) Используя функцию Pardo, преобразуйте значение из PCollection (com.google.api.services.bigquery.model.TableRow) в PCollection (org.apache.beam.sdk.values.Row). 3) Определите схему, как показано ниже Проекты схемы = Schema.builder (). AddInt32Field ("Id"). AddStringField ("Имя"). Build (); Schema Employees = Schema.builder (). AddStringField ("empNo"). AddStringField ("empName"). AddArrayField ("Проекты", FieldType.row (projects)). Build (); 4) Используйте преобразование Unnest для удаления вложенной коллекции

PCollection<Row> pcColl = targetRowCollection.apply(Unnest.<Row>create().withFieldNameFunction(new SerializableFunction<java.util.List<java.lang.String>, java.lang.String>() {
@Override
public java.lang.String apply(java.util.List<java.lang.String> input) {
    return String.join("+", input);
    }
}));

5) Используя функцию Pardo, преобразуйте значение из PCollection (org.apache.beam.sdk.values.Row) в PCollection (com.google.api.services.bigquery.model.TableRow)

Может кто-нибудь мне помочь, используя это преобразование Unnest для преобразования коллекции гнезда из вложенной коллекции.

1 Ответ

0 голосов
/ 11 июня 2019

код для объединения двух Pcollection с вложенной структурой в python с Beam:

with beam.Pipeline(options=option) as p:

    source_record1 =  p | "get data1" >> beam.io.avroio.ReadFromAvro(input_file1)
    source_record2 =  p | "get data2" >> beam.io.avroio.ReadFromAvro(input_file2)

    #convert into <k,v> form
    keyed_record1 = source_record1 | beam.ParDo(addkeysnested(),join_fileld_names1)
    keyed_record2 = source_record2 | beam.ParDo(addkeysnested(),join_fileld_names2)

    #Apply join operation
    rjoin = ({'File1Info': keyed_record1, 'File2Info': keyed_record2}                     
               | beam.CoGroupByKey())


    class addkeysnested(beam.DoFn):
        def process(self,element,fieldName):
            tmp_record = element    
            fieldName = fieldName.split(".")
            for i in range(len(fieldName)):

                if i != len(fieldName) - 1 :
                    tmp_record = tmp_record[fieldName[i].strip()][0]

                else:
                    tmp_record = tmp_record[fieldName[i].strip()]   

        return [(tmp_record,element)]

Примечание: в приведенном выше коде мы можем получить значение ключа на любом уровне вложенных полей, то есть personalInfo.Address.City, после этого применитьCoGroupByKey () для объединения двух коллекций

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...