После зависания первого брокера какая-то тема кафки не получает сообщение - PullRequest
0 голосов
/ 26 февраля 2019

у меня есть 6 брокеров kafka: Kafka1.fat: 9092, kafka2.fat: 9092, kafka3.fat: 9092, kafka4.fat: 9092, kafka5.fat: 9092, kafka6.fat: 9092, и я настроил около 30темы, используя два контейнераFactory.Разница между ними заключается в том, что один является однопоточным, а соответствующая тема занимает 30 разделов по умолчанию.Другой контейнерный завод состоит из 6 потоков с соответствующими темами для 6 разделов.Всегда работает нормально.Но в последнее время kafka1.fat не работает, и тогда у меня есть тема, которую я не могу получить сообщение.Посредством отладки я обнаружил, что тема однопоточных 30 разделов не может получить сообщение, а 6 разделов и 6 потоков работают нормально.Неработающий один из шести посредников не должен влиять на нормальный прием сообщений, почему некоторые темы не будут получать сообщение, есть ли проблемы с моей конфигурацией?

Вот некоторые из моих фрагментов кода

@Bean
fun singleContainerFactory(consumerFactory : ConsumerFactory<Any,Any>): KafkaListenerContainerFactory<*> {
    val containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
    containerFactory.consumerFactory = consumerFactory
    containerFactory.setConcurrency(1)
    containerFactory.isBatchListener = true //批量消费
    containerFactory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
    return containerFactory
}

однопоточная конфигурация

class FileCountReceive: CommonReceive<JsonObject,JsonObject>() {

    @KafkaListener(topics = [ParamType.ORBIT_FILE_COUNT.topic],containerFactory = "singleContainerFactory")
    override fun receive(records: List<ConsumerRecord<Int, ByteArray>>, ack: Acknowledgment) {
        super.receive(records, ack)
    }
}

однопоточный прослушиватель, который не может получить сообщение сейчас

@Bean
fun batchContainerFactory(consumerFactory : ConsumerFactory<Any,Any>): KafkaListenerContainerFactory<*> {
    val containerFactory = ConcurrentKafkaListenerContainerFactory<Any, Any>()
    containerFactory.consumerFactory = consumerFactory
    containerFactory.setConcurrency(concurrency) //concurrency = 6
    containerFactory.isBatchListener = true //批量消费
    containerFactory.containerProperties.ackMode = AckMode.MANUAL_IMMEDIATE
    return containerFactory
}

многопоточная конфигурация (единственное отличие от однопоточности)является то, что количество потоков составляет 6)

class YCReceive : CommonReceive<YCModel,SingleVisibleData>() {


    @KafkaListener(topics = [ParamType.HAWKEYE_S.topic],containerFactory = "batchContainerFactory")
    override fun receive(records: List<ConsumerRecord<Int, ByteArray>>, ack: Acknowledgment) {
        super.receive(records, ack)
    }
}

Многопоточный слушатель

@Bean
fun yc() = NewTopic(ParamType.HAWKEYE_S.topic, 6, 2.toShort())

Многопоточные темы все 6 разделов

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...