Google DataFlow: чтение из BigQuery, объединение трех строковых полей, запись полей ключа / значения в Google Cloud Spanner - PullRequest
1 голос
/ 23 сентября 2019

Ни один из предоставленных шаблонов DataFlow не соответствует тому, что мне нужно сделать, поэтому я пытаюсь написать свой собственный.Мне удалось без проблем запустить пример кода, например пример подсчета слов, поэтому я попытался собрать воедино отдельные части примеров, которые читаются из BigQuery и пишут в Spanner, но в исходном коде так много вещей, которые я не понимаю и не могу приспособиться к ним.Моя собственная проблема.

Я ДЕЙСТВИТЕЛЬНО потерялся в этом, и любая помощь очень ценится!

Цель состоит в том, чтобы использовать DataFlow и Apache Beam SDK для чтения из таблицы BigQuery с 3 строковыми полями.и 1 целочисленное поле, затем объединить содержимое 3 строковых полей в одну строку и поместить эту новую строку в новое поле с именем «ключ», затем я хочу записать поле ключа и целочисленное поле (которое не изменилось) вТаблица гаечного ключа, которая уже существует, в идеале добавляет строки с новым ключом и обновляет целочисленное поле строк с ключом, который уже существует.

Я пытаюсь сделать это в Java, потому что нет соединителя ввода / выводадля Python.Любые советы по работе с Python приветствуются.

Пока я был бы очень рад, если бы я мог просто читать таблицу из BigQuery и записывать все, что получаю из этой таблицы, в таблицу в Spanner, но я могуэтого даже не произошло.

Проблемы:

  • Я использую Maven и не знаю, какие зависимости мне нужно поместить в файл pom
  • Я не знаю, какой пакет и импорт мне нужен в начале моего Java-файла
  • Я не знаю, следует ли мне использовать readTableRows () или read (SerializableFunction) для чтенияот BigQuery
  • Я понятия не имею, как получить доступ к строковым полям в PCollection, чтобы объединить их или как сделать новую PCollection только с полем ключа и целого числа
  • Мне как-то нужно превратить PCollection в мутацию для записи в Spanner
  • Я хочу использовать запрос INSERT UPDATE для записи в таблицу Spanner, которая, по-видимому, не подходит для Spanner.разъем ввода / вывода.

Честно говоря, я слишком смущен, чтобы даже показать тот код, который я пытаюсь запустить.

public class SimpleTransfer {

    public static void main(String[] args) {
        // Create and set your PipelineOptions.
        DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

        // For Cloud execution, set the Cloud Platform project, staging location, and specify DataflowRunner.
        options.setProject("myproject");
        options.setStagingLocation("gs://mybucket");
        options.setRunner(DataflowRunner.class);

        // Create the Pipeline with the specified options.
        Pipeline p = Pipeline.create(options);

        String tableSpec = "database.mytable";

        // read whole table from bigquery
        rowsFromBigQuery =
            p.apply(
                BigQueryIO.readTableRows()
                    .from(tableSpec);

        // Hopefully some day add a transform

        // Somehow make a Mutation
        PCollection<Mutation> mutation = rowsFromBigQuery;

        // Only way I found to write to Spanner, not even sure if that works.
        SpannerWriteResult result = mutation.apply(
            SpannerIO.write().withInstanceId("myinstance").withDatabaseId("mydatabase").grouped());

        p.run().waitUntilFinish();

    }
}

1 Ответ

0 голосов
/ 24 сентября 2019

Страшно иметь дело с этими странными типами данных, но как только вы привыкнете к типам TableRow и Mutation, вы сможете кодировать надежные конвейеры.

Первое, что вам нужночтобы сделать это, возьмите PCollection из TableRow с и конвертируйте их в промежуточный формат, который вам удобен.Давайте используем Beam's KV, который определяет пару ключ-значение.В следующем фрагменте мы извлекаем значения из TableRow и объединяем нужную строку:

rowsFromBigQuery
            .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
                                                     TypeDescriptors.integers()))
                    .via(tableRow -> KV.of(
                               (String) tableRow.get("myKey1")
                               + (String) tableRow.get("myKey2")
                               + (String) tableRow.get("myKey3"),
                               (Integer) tableRow.get("myIntegerField"))))

Наконец, для записи в Spanner мы используем объекты типа Mutation, которыеОпределите вид мутации, которую мы хотим применить к строке в Spanner.Мы сделаем это с другим преобразованием MapElements, которое принимает N входов и возвращает N выходов.Мы определяем там вставку или обновляем мутации:

myKvPairsPCollection
            .apply(
                MapElements.into(TypeDescriptor.of(Mutation.class))
                    .via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
                                    .set("key").to(elm.getKey())
                                    .set("value").to(elm.getValue()));

И затем вы можете передать результат в SpannerIO.write.Весь конвейер выглядит примерно так:

        Pipeline p = Pipeline.create(options);

        String tableSpec = "database.mytable";

        // read whole table from bigquery
        PCollection<TableRow> rowsFromBigQuery =
            p.apply(
                BigQueryIO.readTableRows().from(tableSpec));

        // Take in a TableRow, and convert it into a key-value pair
        PCollection<Mutation> mutations = rowsFromBigQuery
            // First we make the TableRows into the appropriate key-value
            // pair of string key and integer.
            .apply(
                MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings()
                                                     TypeDescriptors.integers()))
                    .via(tableRow -> KV.of(
                               (String) tableRow.get("myKey1")
                               + (String) tableRow.get("myKey2")
                               + (String) tableRow.get("myKey3"),
                               (Integer) tableRow.get("myIntegerField"))))
            // Now we construct the mutations
            .apply(
                MapElements.into(TypeDescriptor.of(Mutation.class))
                    .via(elm -> Mutation.newInsertOrUpdateBuilder("myTableName)
                                    .set("key").to(elm.getKey())
                                    .set("value").to(elm.getValue()));

        // Now we pass the mutations to spanner
        SpannerWriteResult result = mutations.apply(
            SpannerIO.write()
                    .withInstanceId("myinstance")
                    .withDatabaseId("mydatabase").grouped());

        p.run().waitUntilFinish();

    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...