Интересный вопрос, некоторые хорошие идеи уже есть, но я хотел бы показать еще одну возможность только с Dataflow и BigQuery. Если это пакетное задание без шаблонов, мы можем использовать PipelineResult.waitUntilFinish()
, который:
Ожидает завершения конвейера и возвращает окончательное состояние.
Затем мы проверяем, является ли State
равным DONE
, и при необходимости используем оператор MERGE
:
PipelineResult res = p.run();
res.waitUntilFinish();
if (res.getState() == PipelineResult.State.DONE) {
LOG.info("Dataflow job is finished. Merging results...");
MergeResults();
LOG.info("All done :)");
}
Чтобы проверить это, мы можем создать таблицу BigQuery. (upsert.full
), который будет содержать окончательные результаты и будет обновляться при каждом запуске:
bq mk upsert
bq mk -t upsert.full name:STRING,total:INT64
bq query --use_legacy_sql=false "INSERT upsert.full (name, total) VALUES('tv', 10), ('laptop', 20)"
в начале мы заполним его total
из 10 телевизоров. Но теперь давайте представим, что мы продаем 5 дополнительных телевизоров, и в нашем задании Dataflow мы запишем одну строку во временную таблицу (upsert.temp
) с новым исправленным значением (15):
p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
.<String>write()
.to(output)
.withFormatFunction(
(String dummy) ->
new TableRow().set("name", "tv").set("total", 15))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(schema));
Теперь мы хотим обновить исходную таблицу следующим запросом ( синтаксис DML ):
MERGE upsert.full F
USING upsert.temp T
ON T.name = F.name
WHEN MATCHED THEN
UPDATE SET total = T.total
WHEN NOT MATCHED THEN
INSERT(name, total)
VALUES(name, total)
Поэтому мы можем использовать клиентскую библиотеку BigQuery Java в MergeResults
:
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"MERGE upsert.full F "
+ ...
+ "VALUES(name, total)")
.setUseLegacySql(false)
.build();
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
Это основано на этом фрагменте , который включает некоторую базовую c обработку ошибок. Обратите внимание, что вам нужно добавить это к вашему pom.xml
или эквивалентному:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.82.0</version>
</dependency>
, и это работает для меня:
INFO: 2020-02-08T11:38:56.292Z: Worker pool stopped.
Feb 08, 2020 12:39:04 PM org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
INFO: Job 2020-02-08_REDACTED finished with status DONE.
Feb 08, 2020 12:39:04 PM org.apache.beam.examples.BigQueryUpsert main
INFO: Dataflow job is finished. Merging results...
Feb 08, 2020 12:39:09 PM org.apache.beam.examples.BigQueryUpsert main
INFO: All done :)
$ bq query --use_legacy_sql=false "SELECT name,total FROM upsert.full LIMIT 10"
+--------+-------+
| name | total |
+--------+-------+
| tv | 15 |
| laptop | 20 |
+--------+-------+
Протестировано с 2.17.0 Java SDK, а также прямой и поток данных.
Полный пример здесь