Можно ли использовать составные триггеры в сочетании с микропакетами с потоком данных? - PullRequest
0 голосов
/ 21 апреля 2020

У нас есть неограниченный PCollection PCollection<TableRow> source, который мы вставляем в BigQuery.

Простой "по книге" способ выстрелить windows каждые 500 тысяч сообщений или пять минут будет:

source.apply("GlobalWindow", Window.<TableRow>into(new GlobalWindows())
    .triggering(Repeatedly.forever(AfterFirst.of(
         AfterPane.elementCountAtLeast(500000),
         AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardMinutes(5))))
     ).withAllowedLateness(Duration.standardMinutes(1440)).discardingFiredPanes())

Можно подумать, что применение следующего к запущенному окну / панели позволит вам записать содержимое запущенной панели в BigQuery:

.apply("BatchWriteToBigQuery", BigQueryIO.writeTableRows()
.to(destination)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withNumFileShards(NUM_FILE_SHARDS)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

Но это приведет к ошибке компиляции An exception occured while executing the Java class. When writing an unbounded PCollection via FILE_LOADS, triggering frequency must be specified

Относительно простым решением было бы добавить .withTriggeringFrequency(Duration.standardMinutes(5)) к вышеприведенному, что, по сути, дало бы идею вставки либо каждые пять минут или каждые N сообщения полностью аннулируются, и в любом случае вы могли бы также избавиться от оконного управления в этом случае.

Есть ли способ действительно выполнить sh это?

1 Ответ

0 голосов
/ 21 апреля 2020

FILE_LOADS требуется частота срабатывания.

Если вы хотите больше результатов в реальном времени, вы можете использовать STREAMING_INSERTS

Ссылка https://beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.Method.html#FILE_LOADS

...