у меня есть 6 брокеров kafka: Kafka1.fat: 9092, kafka2.fat: 9092, kafka3.fat: 9092, kafka4.fat: 9092, kafka5.fat: 9092, kafka6.fat: 9092, и я настроил около 30темы, используя два контейнераFactory.Разница между ними заключается в том, что один является однопоточным, а соответствующая тема занимает 30 разделов по умолчанию.Другой контейнерный завод состоит из 6 потоков с соответствующими темами для 6 разделов.Всегда работает нормально.Но в последнее время kafka1.fat не работает, и тогда у меня есть тема, которую я не могу получить сообщение.Посредством отладки я обнаружил, что тема однопоточных 30 разделов не может получить сообщение, а 6 разделов и 6 потоков работают нормально.Неработающий один из шести посредников не должен влиять на нормальный прием сообщений, почему некоторые темы не будут получать сообщение, есть ли проблемы с моей конфигурацией?
Вот некоторые из моих фрагментов кода
@Bean
fun singleContainerFactory(consumerFactory : ConsumerFactory<Any,Any>): KafkaListenerContainerFactory<*> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
containerFactory.consumerFactory = consumerFactory
containerFactory.setConcurrency(1)
containerFactory.isBatchListener = true //批量消费
containerFactory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
return containerFactory
}
однопоточная конфигурация
class FileCountReceive: CommonReceive<JsonObject,JsonObject>() {
@KafkaListener(topics = [ParamType.ORBIT_FILE_COUNT.topic],containerFactory = "singleContainerFactory")
override fun receive(records: List<ConsumerRecord<Int, ByteArray>>, ack: Acknowledgment) {
super.receive(records, ack)
}
}
однопоточный прослушиватель, который не может получить сообщение сейчас
@Bean
fun batchContainerFactory(consumerFactory : ConsumerFactory<Any,Any>): KafkaListenerContainerFactory<*> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
containerFactory.consumerFactory = consumerFactory
containerFactory.setConcurrency(concurrency) //concurrency = 6
containerFactory.isBatchListener = true //批量消费
containerFactory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
return containerFactory
}
многопоточная конфигурация (единственное отличие от однопоточности)является то, что количество потоков составляет 6)
class YCReceive : CommonReceive<YCModel,SingleVisibleData>() {
@KafkaListener(topics = [ParamType.HAWKEYE_S.topic],containerFactory = "batchContainerFactory")
override fun receive(records: List<ConsumerRecord<Int, ByteArray>>, ack: Acknowledgment) {
super.receive(records, ack)
}
}
Многопоточный слушатель
@Bean
fun yc() = NewTopic(ParamType.HAWKEYE_S.topic, 6, 2.toShort())
Многопоточные темы все 6 разделов