Мы используем довольно простой поток, когда сообщения извлекаются из PubSub, их содержимое JSON распределяется по двум типам (для BigQuery и Postgres) и затем вставляется в оба приемника.
Но мы видим дубликаты в обоих приемниках (Postgres был как бы исправлен с уникальным ограничением и "ON CONFLICT ... DO NOTHING").
Сначала мы доверяли предположительно UUId "insertId", который создает Apache Beam / BigQuery.
Затем мы добавляем атрибут «unique_label» к каждому сообщению, прежде чем ставить их в очередь в PubSub, используя данные из самого JSON, что придает им уникальность (device_id + метка времени чтения). И подписался на тему, используя этот атрибут с методом withIdAttribute.
Наконец мы заплатили за поддержку GCP, и их «решения» не работают. Они сказали нам даже использовать преобразование Reshuffle, которое, кстати, устарело, и некоторые оконные функции (чего мы не будем делать, поскольку нам нужны данные почти в реальном времени).
Это основной поток, довольно простой:
[ОБНОВЛЕНО С ПОСЛЕДНИМ КОДОМ]
Трубопроводный
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(OptionArgs::class.java)
val pipeline = Pipeline.create(options)
var mappings = ""
// Value only available at runtime
if (options.schemaFile.isAccessible){
mappings = readCloudFile(options.schemaFile.get())
}
val tableRowMapper = ReadingToTableRowMapper(mappings)
val postgresMapper = ReadingToPostgresMapper(mappings)
val pubsubMessages =
pipeline
.apply("ReadPubSubMessages",
PubsubIO
.readMessagesWithAttributes()
.withIdAttribute("id_label")
.fromTopic(options.pubSubInput))
pubsubMessages
.apply("AckPubSubMessages", ParDo.of(object: DoFn<PubsubMessage, String>() {
@ProcessElement
fun processElement(context: ProcessContext) {
LOG.info("Processing readings: " + context.element().attributeMap["id_label"])
context.output("")
}
}))
val disarmedMessages =
pubsubMessages
.apply("DisarmedPubSubMessages",
DisarmPubsubMessage(tableRowMapper, postgresMapper)
)
disarmedMessages
.get(TupleTags.readingErrorTag)
.apply("LogDisarmedErrors", ParDo.of(object: DoFn<String, String>() {
@ProcessElement
fun processElement(context: ProcessContext) {
LOG.info(context.element())
context.output("")
}
}))
disarmedMessages
.get(TupleTags.tableRowTag)
.apply("WriteToBigQuery",
BigQueryIO
.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
.to(options.bigQueryOutput)
)
pipeline.run()
DissarmPubsubMessage - это PTransforms, который использует преобразование FlatMapElements для получения TableRow и ReadingsInputFlatten (собственный класс для Postgres)
Мы ожидаем ноль дубликатов или «лучшее усилие» (и мы добавляем некоторую работу по очистке хрон), мы заплатили за эти продукты для запуска статистики и анализа больших данных ...
[ОБНОВЛЕНИЕ 1]
Я даже добавляю новое простое преобразование, которое регистрирует наш уникальный атрибут через ParDo, который предположительно должен подтвердить PubsubMessage, но это не так:
новый поток с шагом AckPubSubMessages
Спасибо !!