Apache Beam PubSubToBigQuery.java удаление дубликатов? - PullRequest
0 голосов
/ 10 февраля 2019

Я использую код PubSubToBigQuery.java без каких-либо изменений.Кто-нибудь, пожалуйста, покажите мне, как удалить дубликаты записей во время этого процесса?

Я знаю, что хитрость заключается в создании Window и использовании GroupBy, но на самом деле не знаю, как его написать.

Спасибо

1 Ответ

0 голосов
/ 11 февраля 2019

При условии, что вы просто хотите отфильтровать дубликаты при успешном разборе событий.Вам нужно будет добавить код после этой строки :

transformOut
    .get(TRANSFORM_OUT)
    .apply("keyed", WithKeys.of(/* choose your key from table row to identify duplicates */))
    .apply(GroupByKey.create())
    .apply("dedup", ParDo.of(new DoFn<KV<String, Iterable<TableRow>>, TableRow>() {
      public void ProcessElement(ProcessContext context) {
        // only output one element from list to dedup.
        context.output(context.element().getValue().iterator().next());
      }
    }
    ))
    .apply(Window.configure().triggering(/* choose your trigger */)
    .apply(
        "WriteSuccessfulRecords",
        BigQueryIO.writeTableRows()
            .withoutValidation()
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .to(options.getOutputTableSpec()));

Фактически BeamSQL пытается поддержать ваш вариант использования (проверьте PubsubToBigqueryIT.java ).BeamSQL позволяет создавать таблицы на тему pubsub и таблицы больших запросов.Чтение из pubsub, преобразование сообщений pubsub и запись в таблицу BQ уже обрабатываются BeamSQL.SQL можно применять к данным, которые читаются из pubsub.Однако в BeamSQL могут отсутствовать некоторые функции (например, функция агрегации ANY_VALUE, если вы хотите использовать group by для дедупликации в SQL) для завершения вашей задачи.

...