mongoDB вставляется дважды при вызове в разных потоках - PullRequest
1 голос
/ 17 июня 2020

В основном я использую сообщения из весеннего облачного потока kafka и вставляю его в MongoDB. Мой код работает нормально, если мой кластер mon go работает. У меня 2 проблемы. В случае, если My Mon go Instance не работает

  1. автоматическая фиксация облачного потока отключена (autoCommitOffset имеет значение false), тогда также не происходит повторного опроса, даже если он еще не подтвердил сообщение
  2. при проверке на понедельник go соединение это займет некоторое время и в этот период времени, если он получит два сообщения с одним и тем же идентификатором, и после этого, если я запускаю экземпляр mon go, он дублирует сообщения, которые в нормальном случае работают нормально

Есть ли у нас какое-либо решение?

Вот мой код,

interface ResourceInventorySink {
companion object {
    const val INPUT = "resourceInventoryInput"
}
@Input(INPUT)
fun input(): SubscribableChannel

}

    @EnableBinding(ResourceInventorySink::class)
    class InventoryEventListeners {



   val logger = LoggerFactory.getLogger(javaClass)
    @Autowired
    lateinit var  resourceInventoryService : ResourceInventoryService


    @StreamListener(ResourceInventorySink.INPUT, condition = OperationConstants.INSERT)
    fun receiveInsert(event : Message<ResourceInventoryEvent>) {
        logger.info("received Insert message {}", event.payload.toString())
        val success = resourceInventoryService.insert(event.payload)
        success.subscribe({
            logger.info("Data Inserted", event.payload.toString())
            event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
        },{
            if(it !is DataAccessResourceFailureException) {
                logger.error("Exception Occured {} {}", it.message , it.cause.toString())
                event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
            }
            else {
                logger.error("Error Inserting in Mongo DB {}", it.cause)
            }

        })
    }

Вот мой класс обслуживания

@Service
class ResourceInventoryService() {

    val logger = LoggerFactory.getLogger(javaClass)

    @Autowired
    lateinit var  resourceInventoryRepository: ResourceInventoryRepository

    fun insert(newResource: ResourceInventoryEvent) = resourceInventoryRepository
                                                     .findByProductId(newResource.productId)
                                                     .switchIfEmpty(newResource.convertTODocument().toMono())
                                                     .flatMap { resourceInventoryRepository.save(it) }
                                                     .onErrorResume { Mono.error(it) }

это мое приложение .yml

spring:
  cloud:
    stream:
      default:
        consumer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers:
            - localhost:9092
          consumer-properties:
            key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
            value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema.registry.url: http://localhost:8081
            enable.auto.commit: false
            specific.avro.reader: true
        bindings:
          resourceInventoryInput:
            consumer:
              autoOffsetCommit: false
      default-binder: kafka
      bindings:
        resourceInventoryInput:
          binder: kafka
          destination: ${application.messaging.topic}
          content-type: application/*+avro
          group: ${application.messaging.group}

РЕДАКТИРОВАТЬ 1. Подтверждение равно нулю

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...