В основном я использую сообщения из весеннего облачного потока kafka и вставляю его в MongoDB. Мой код работает нормально, если мой кластер mon go работает. У меня 2 проблемы. В случае, если My Mon go Instance не работает
- автоматическая фиксация облачного потока отключена (autoCommitOffset имеет значение false), тогда также не происходит повторного опроса, даже если он еще не подтвердил сообщение
- при проверке на понедельник go соединение это займет некоторое время и в этот период времени, если он получит два сообщения с одним и тем же идентификатором, и после этого, если я запускаю экземпляр mon go, он дублирует сообщения, которые в нормальном случае работают нормально
Есть ли у нас какое-либо решение?
Вот мой код,
interface ResourceInventorySink {
companion object {
const val INPUT = "resourceInventoryInput"
}
@Input(INPUT)
fun input(): SubscribableChannel
}
@EnableBinding(ResourceInventorySink::class)
class InventoryEventListeners {
val logger = LoggerFactory.getLogger(javaClass)
@Autowired
lateinit var resourceInventoryService : ResourceInventoryService
@StreamListener(ResourceInventorySink.INPUT, condition = OperationConstants.INSERT)
fun receiveInsert(event : Message<ResourceInventoryEvent>) {
logger.info("received Insert message {}", event.payload.toString())
val success = resourceInventoryService.insert(event.payload)
success.subscribe({
logger.info("Data Inserted", event.payload.toString())
event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
},{
if(it !is DataAccessResourceFailureException) {
logger.error("Exception Occured {} {}", it.message , it.cause.toString())
event.headers.get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment::class.java)?.acknowledge()
}
else {
logger.error("Error Inserting in Mongo DB {}", it.cause)
}
})
}
Вот мой класс обслуживания
@Service
class ResourceInventoryService() {
val logger = LoggerFactory.getLogger(javaClass)
@Autowired
lateinit var resourceInventoryRepository: ResourceInventoryRepository
fun insert(newResource: ResourceInventoryEvent) = resourceInventoryRepository
.findByProductId(newResource.productId)
.switchIfEmpty(newResource.convertTODocument().toMono())
.flatMap { resourceInventoryRepository.save(it) }
.onErrorResume { Mono.error(it) }
это мое приложение .yml
spring:
cloud:
stream:
default:
consumer:
useNativeEncoding: true
kafka:
binder:
brokers:
- localhost:9092
consumer-properties:
key.deserializer : org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
enable.auto.commit: false
specific.avro.reader: true
bindings:
resourceInventoryInput:
consumer:
autoOffsetCommit: false
default-binder: kafka
bindings:
resourceInventoryInput:
binder: kafka
destination: ${application.messaging.topic}
content-type: application/*+avro
group: ${application.messaging.group}
РЕДАКТИРОВАТЬ 1. Подтверждение равно нулю