Как объединить таблицы BQ для двух или более ключей с облачным потоком данных? - PullRequest
0 голосов
/ 04 мая 2018

У меня есть две таблицы A и B. Обе они имеют поля session_id и cookie_id. Как мне создать выход объединенной таблицы, соединяющей A с B на session_id, cookie_id с помощью конвейера потока данных? CoGroupByKey метод позволяет вам присоединиться к одному ключу. Не смог найти ничего полезного и в документации.

Ответы [ 2 ]

0 голосов
/ 08 мая 2018

Чтобы расширить ответ пользователя 9720010. Вы можете создать составной ключ, сопоставив поля с комбинацией session_id и cookie_id. Этот шаблон объясняется в общих шаблонах сценариев использования потока данных blog . Предполагая, что вы используете BigQuery, вы можете сделать что-то похожее на следующее:

Pipeline pipeline = Pipeline.create(options);

// Create tuple tags for the value types in each collection.
final TupleTag<TableRow> table1Tag = new TupleTag<>();
final TupleTag<TableRow> table2Tag = new TupleTag<>();

// Transform for keying table rows by session_id and cookie_id
WithKeys<String, TableRow> sessionAndCookieKeys = WithKeys.of(
    (TableRow row) ->
        String.format("%s#%s",
            row.get("session_id"),
            row.get("cookie_id")))
    .withKeyType(TypeDescriptors.strings());

/*
 * Steps:
 *  1) Read table 1's rows
 *  2) Read table 2's rows
 *  3) Map each row to a composite key
 *  4) Join on the composite key
 *  5) Process the results
 */
PCollection<KV<String, TableRow>> table1Rows = pipeline
    .apply(
        "ReadTable1",
        BigQueryIO
            .readTableRows()
            .from(options.getTable1()))
    .apply("WithKeys", sessionAndCookieKeys);

PCollection<KV<String, TableRow>> table2Rows = pipeline
    .apply(
        "ReadTable2",
        BigQueryIO
            .readTableRows()
            .from(options.getTable2()))
    .apply("WithKeys", sessionAndCookieKeys);

//Merge collection values into a CoGbkResult collection
PCollection<KV<String, CoGbkResult>> coGbkResult = KeyedPCollectionTuple
    .of(table1Tag, table1Rows)
    .and(table2Tag, table2Rows)
    .apply("JoinOnSessionAndCookie", CoGroupByKey.create());

// Process the results
coGbkResult.apply(
    "ProcessResults", 
    ParDo.of(new DoFn<KV<String, CoGbkResult>, Object>() {
      @ProcessElement
      public void processElement(ProcessContext context) {
        // Do something here
      }
    }));
0 голосов
/ 07 мая 2018

Один из подходов, которым я следую в таких ситуациях, - это создание специального ключа, который представляет собой комбинацию двух ключей. После чтения данных при преобразовании в пару ключ-значение я бы вывел session_id $ cookie_id в виде единой объединенной строки. Здесь $ может быть любым разделителем, который не образует кодировку двух ключей. Разделитель также можно игнорировать.

...