Как избежать дубликатов в BigQuery путем потоковой передачи с Apache Beam IO? - PullRequest
0 голосов
/ 07 июня 2019

Мы используем довольно простой поток, когда сообщения извлекаются из PubSub, их содержимое JSON распределяется по двум типам (для BigQuery и Postgres) и затем вставляется в оба приемника. Но мы видим дубликаты в обоих приемниках (Postgres был как бы исправлен с уникальным ограничением и "ON CONFLICT ... DO NOTHING").

Сначала мы доверяли предположительно UUId "insertId", который создает Apache Beam / BigQuery. Затем мы добавляем атрибут «unique_label» к каждому сообщению, прежде чем ставить их в очередь в PubSub, используя данные из самого JSON, что придает им уникальность (device_id + метка времени чтения). И подписался на тему, используя этот атрибут с методом withIdAttribute. Наконец мы заплатили за поддержку GCP, и их «решения» не работают. Они сказали нам даже использовать преобразование Reshuffle, которое, кстати, устарело, и некоторые оконные функции (чего мы не будем делать, поскольку нам нужны данные почти в реальном времени).

Это основной поток, довольно простой: [ОБНОВЛЕНО С ПОСЛЕДНИМ КОДОМ] Трубопроводный

        val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OptionArgs::class.java)
        val pipeline = Pipeline.create(options)
        var mappings = ""

        // Value only available at runtime
        if (options.schemaFile.isAccessible){
            mappings = readCloudFile(options.schemaFile.get())
        }

        val tableRowMapper = ReadingToTableRowMapper(mappings)
        val postgresMapper = ReadingToPostgresMapper(mappings)

        val pubsubMessages =
            pipeline
            .apply("ReadPubSubMessages",
                PubsubIO
                    .readMessagesWithAttributes()
                    .withIdAttribute("id_label")
                    .fromTopic(options.pubSubInput))

        pubsubMessages
            .apply("AckPubSubMessages", ParDo.of(object: DoFn<PubsubMessage, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info("Processing readings: " + context.element().attributeMap["id_label"])
                    context.output("")
                }
            }))

        val disarmedMessages =
            pubsubMessages
                .apply("DisarmedPubSubMessages",
                    DisarmPubsubMessage(tableRowMapper, postgresMapper)
                )

        disarmedMessages
            .get(TupleTags.readingErrorTag)
            .apply("LogDisarmedErrors", ParDo.of(object: DoFn<String, String>() {
                @ProcessElement
                fun processElement(context: ProcessContext) {
                    LOG.info(context.element())
                    context.output("")
                }
            }))

        disarmedMessages
            .get(TupleTags.tableRowTag)
            .apply("WriteToBigQuery",
                BigQueryIO
                    .writeTableRows()
                    .withoutValidation()
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
                    .to(options.bigQueryOutput)
            )

        pipeline.run()

DissarmPubsubMessage - это PTransforms, который использует преобразование FlatMapElements для получения TableRow и ReadingsInputFlatten (собственный класс для Postgres)

Мы ожидаем ноль дубликатов или «лучшее усилие» (и мы добавляем некоторую работу по очистке хрон), мы заплатили за эти продукты для запуска статистики и анализа больших данных ...

[ОБНОВЛЕНИЕ 1] Я даже добавляю новое простое преобразование, которое регистрирует наш уникальный атрибут через ParDo, который предположительно должен подтвердить PubsubMessage, но это не так:

новый поток с шагом AckPubSubMessages

Спасибо !!

1 Ответ

0 голосов
/ 07 июня 2019

Похоже, вы используете глобальное окно.Одним из способов было бы поместить это окно в N-минутное окно.Затем обработайте ключи в окне и бросьте элементы с двойными ключами.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...