Консолидация последнего значения заполнения одной строки из PCollection записей CDC с использованием потока данных Apache Beam - PullRequest
0 голосов
/ 23 октября 2019

Записи записи изменений (CDC) не будут иметь всех значений для столбцов в записи. Возможно, что для первичного ключа записи, скажем, R1, мы можем иметь PCollection записей CDC для R1 с меткой времени CDC.
Отл. Если запись R1 имеет столбцы C1, C2, C3, C4, CDCTimestamp

CDC, записи будут
R1, C1.1, -, C3.1, -, 10:02
R1, C1.2, C2.2, -, C4.2,10: 03
R1, -, C2.3, -, C4.3,10: 04
R2, C2.1,-, C3.1, -, 10:03

Когда процессы конвейерного луча обрабатываются, мне нужно получить вывод следующим образом PCollection, содержащий R1, C1.2, C2.3, C3.1, C4.3,10: 04
R2, C2.1, -, C3.1, -, 10:03

Любые указатели приветствуются! Спасибо.

Ответы [ 2 ]

1 голос
/ 23 октября 2019

Не уверен, правильно ли я понимаю ваш вопрос.

Как насчет

1) сначала преобразовать его в пары KV:

R1: C1.1,--,C3.1,--,10:02
R1: C1.2,C2.2,--,C4.2,10:03
R1: --,C2.3,--,C4.3,10:04
R2: C2.1,--,C3.1,--,10:03

2) Затем выполнить GroupBykey:

R1-> C1.1,--,C3.1,--,10:02
     C1.2,C2.2,--,C4.2,10:03
     --,C2.3,--,C4.3,10:04
R2-> C2.1,--,C3.1,--,10:03

3)Затем для каждого ключа обработайте несколько элементов и преобразуйте их в один.

0 голосов
/ 25 октября 2019

Благодаря Руоюн Хуангу, который указал мне правильное направление ... Прикрепление псевдо-кода, если кто-то еще заинтересован в решении аналогичной проблемы.

PCollection<TableRow> rowsFromBigQuery =
           pipeline.apply(
                BigQueryIO.readTableRows()
                    .fromQuery(QUERY)
                    .withoutValidation()
                    .usingStandardSql()
                   .withMethod(Method.DIRECT_READ));
                   //.withReadOptions(tableReadOptions));
// Seggregate by keys
PCollection<KV<String, TableRow>> rowByKeys =
            rowsFromBigQuery.apply("Seggregating by row keys", ParDo.of(new CreateKeyValue()));
// Group by keys
PCollection<KV<String, Iterable<TableRow>>> groupByKeys =
            rowByKeys.apply("Grouping by row keys", GroupByKey.<String, TableRow>create());
// Consolidate CDC
PCollection<TableRow> cdcConsolidatedRows =
            groupByKeys.apply("Consolidate CDC by row keys", ParDo.of(new ConsolidateCDC()));

Подписи для CreateKeyValue и ConsolidateCDC следующие:

static class ConsolidateCDC extends DoFn <KV<String, Iterable<TableRow>>, TableRow> and static class CreateKeyValue extends DoFn<TableRow, KV<String, TableRow>>
...