Kafka Producer Thread, огромное количество тем, даже когда сообщения не отправляются - PullRequest
0 голосов
/ 23 октября 2018

В настоящее время я профилировал приложение весенней загрузки для производителя kafka и обнаружил, что работает много "kafka-provider-network-thread" (всего 47).Который никогда не перестанет работать, даже если данные не отправляются.Мое приложение выглядит примерно так:

var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))

с KafkaSender:

@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {

@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
   kafkaTemplate.executeInTransaction { kt ->
       kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
               message)
   }
}

companion object {
    val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}

Поскольку каждый раз, когда я хочу отправить сообщение в Kafka, я создаю новый KafkaSender, я думал, чтобудет создан новый поток, который затем отправит сообщение в очередь kafka.В настоящее время похоже, что пул производителей генерируется, но никогда не очищается, даже если ни один из них не имеет ничего общего.

Это поведение предназначено?

По моему мнению, поведение должно бытьпочти так же, как пул источников данных, поддерживать поток в течение некоторого времени, но когда нечего делать, очистите его.

1 Ответ

0 голосов
/ 23 октября 2018

При использовании транзакций кэш производителя увеличивается по требованию и не уменьшается.

Если вы создаете сообщения в потоке контейнера-получателя (получателя);есть производитель для каждой темы / раздела / группы потребителей.Это необходимо для решения проблемы фехтования зомби, так что если произойдет перебалансировка и раздел переместится в другой экземпляр, идентификатор транзакции останется прежним, чтобы брокер мог правильно обработать ситуацию.

Если вы нене заботясь о проблеме фехтования зомби (и вы можете обрабатывать повторные доставки), установите для свойства producerPerConsumerPartition значение false на DefaultKafkaProducerFactory, и число производителей будет намного меньше.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...