Облачное хранилище в BigQuery (upsert) через DataFlow - PullRequest
2 голосов
/ 05 февраля 2020

Всякий раз, когда файл записывается в облачное хранилище, я хочу, чтобы он запускал облачную функцию, которая выполняет шаблон DataFlow для преобразования содержимого файла и записи результатов в BigQuery.

Я думаю, что получил дескриптор, который много по большей части. Но проблема в том, что мне не нужно просто вставлять в таблицу BQ, мне нужно сохранить (с помощью операции слияния). Кажется, что это было бы общим требованием, но соединитель Apache Beam BQ не предлагает эту опцию (только запись, создание и усечение / запись).

Итак, я подумал ... ОК, если бы я мог просто захватить, когда конвейер DataFlow завершил выполнение, я мог бы сделать запись DataFlow во временную таблицу, а затем я мог бы вызвать запрос SQL Merge для объединения данных из временной таблицы в целевую таблицу. Однако я не вижу никакого способа вызвать облачную функцию после завершения выполнения конвейера.

Любые предложения о том, как достичь sh конечной цели?

Спасибо

Ответы [ 3 ]

1 голос
/ 06 февраля 2020

Я реализовал точный вариант использования, но вместо использования 2 разных конвейеров, вы можете просто создать 1 конвейер.

Шаг 1: Считать файл из gcs и преобразовать его в TableRow.

Шаг 2: Считать всю строку из BigQuery.

Шаг 3: Создать 1 пардо, где у вас есть Ваша пользовательская операция вставки, как показано ниже:

PCollection<KV<String,TableRow>> val = p.apply(BigQueryIO.readTableRows().from(""));

PCollection<KV<String,TableRow>> val1 = p.apply(TextIO.read().from("")).apply(Convert to TableRow()));

Шаг 4: Выполните CoGroupByKey и выполните pardo поверх этого результата, чтобы получить обновленную (эквивалентную MERGE OPERATION).

Шаг 5 : Вставить полный TableRow в BQ, используя режим WRITE_TRUNCATE. Здесь часть кода будет немного сложнее, но это будет лучше при использовании одного конвейера.

1 голос
/ 08 февраля 2020

Интересный вопрос, некоторые хорошие идеи уже есть, но я хотел бы показать еще одну возможность только с Dataflow и BigQuery. Если это пакетное задание без шаблонов, мы можем использовать PipelineResult.waitUntilFinish(), который:

Ожидает завершения конвейера и возвращает окончательное состояние.

Затем мы проверяем, является ли State равным DONE, и при необходимости используем оператор MERGE:

PipelineResult res = p.run();
res.waitUntilFinish();

if (res.getState() == PipelineResult.State.DONE) {
    LOG.info("Dataflow job is finished. Merging results...");
    MergeResults();
    LOG.info("All done :)");
}

Чтобы проверить это, мы можем создать таблицу BigQuery. (upsert.full), который будет содержать окончательные результаты и будет обновляться при каждом запуске:

bq mk upsert
bq mk -t upsert.full name:STRING,total:INT64
bq query --use_legacy_sql=false "INSERT upsert.full (name, total) VALUES('tv', 10), ('laptop', 20)"

в начале мы заполним его total из 10 телевизоров. Но теперь давайте представим, что мы продаем 5 дополнительных телевизоров, и в нашем задании Dataflow мы запишем одну строку во временную таблицу (upsert.temp) с новым исправленным значением (15):

p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
                .<String>write()
                .to(output)
                .withFormatFunction(
                    (String dummy) ->
                    new TableRow().set("name", "tv").set("total", 15))
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withSchema(schema));

Теперь мы хотим обновить исходную таблицу следующим запросом ( синтаксис DML ):

MERGE upsert.full F
USING upsert.temp T
ON T.name = F.name
WHEN MATCHED THEN
  UPDATE SET total = T.total
WHEN NOT MATCHED THEN
  INSERT(name, total)
  VALUES(name, total)

Поэтому мы можем использовать клиентскую библиотеку BigQuery Java в MergeResults:

BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
    QueryJobConfiguration.newBuilder(
          "MERGE upsert.full F "
        + ...
        + "VALUES(name, total)")
        .setUseLegacySql(false)
        .build();

JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

Это основано на этом фрагменте , который включает некоторую базовую c обработку ошибок. Обратите внимание, что вам нужно добавить это к вашему pom.xml или эквивалентному:

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-bigquery</artifactId>
  <version>1.82.0</version>
</dependency>

, и это работает для меня:

INFO: 2020-02-08T11:38:56.292Z: Worker pool stopped.
Feb 08, 2020 12:39:04 PM org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
INFO: Job 2020-02-08_REDACTED finished with status DONE.
Feb 08, 2020 12:39:04 PM org.apache.beam.examples.BigQueryUpsert main
INFO: Dataflow job is finished. Merging results...
Feb 08, 2020 12:39:09 PM org.apache.beam.examples.BigQueryUpsert main
INFO: All done :)
$ bq query --use_legacy_sql=false "SELECT name,total FROM upsert.full LIMIT 10"
+--------+-------+
|  name  | total |
+--------+-------+
| tv     |    15 |
| laptop |    20 |
+--------+-------+

Протестировано с 2.17.0 Java SDK, а также прямой и поток данных.

Полный пример здесь

1 голос
/ 06 февраля 2020

Не существует встроенного встроенного решения для генерации события в конце задания потока данных. Тем не менее, вы можете обмануть благодаря журналам.

Для этого:

  • Go для журналов, выберите расширенный фильтр (стрелка справа от панели фильтров) и вставьте этот пользовательский фильтр:
resource.type="dataflow_step" textPayload="Worker pool stopped."

Вы должны видеть только ваш конец потока данных. Затем вы должны создать приемник в PubSub этого результата. Затем вам нужно подключить свою функцию к этим сообщениям PubSub, и вы можете делать все, что вы хотите.

Для этого, после заполнения вашего пользовательского фильтра

  • Нажмите кнопку Создать приемник
  • Установить имя приемника
  • Установить пункт назначения PubSub
  • Выберите свой топи c
  • Теперь подключите функцию к этому топи c, это будет срабатывать только в конце потока данных.
...