При условии, что вы просто хотите отфильтровать дубликаты при успешном разборе событий.Вам нужно будет добавить код после этой строки :
transformOut
.get(TRANSFORM_OUT)
.apply("keyed", WithKeys.of(/* choose your key from table row to identify duplicates */))
.apply(GroupByKey.create())
.apply("dedup", ParDo.of(new DoFn<KV<String, Iterable<TableRow>>, TableRow>() {
public void ProcessElement(ProcessContext context) {
// only output one element from list to dedup.
context.output(context.element().getValue().iterator().next());
}
}
))
.apply(Window.configure().triggering(/* choose your trigger */)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.to(options.getOutputTableSpec()));
Фактически BeamSQL пытается поддержать ваш вариант использования (проверьте PubsubToBigqueryIT.java ).BeamSQL позволяет создавать таблицы на тему pubsub и таблицы больших запросов.Чтение из pubsub, преобразование сообщений pubsub и запись в таблицу BQ уже обрабатываются BeamSQL.SQL можно применять к данным, которые читаются из pubsub.Однако в BeamSQL могут отсутствовать некоторые функции (например, функция агрегации ANY_VALUE, если вы хотите использовать group by для дедупликации в SQL) для завершения вашей задачи.