В настоящее время я профилировал приложение весенней загрузки для производителя kafka и обнаружил, что работает много "kafka-provider-network-thread" (всего 47).Который никогда не перестанет работать, даже если данные не отправляются.Мое приложение выглядит примерно так:
var kafkaSender = KafkaSender(kafkaTemplate, applicationProperties)
kafkaSender.sendToKafka(json, rs.getString("KEY"))
с KafkaSender:
@Service
class KafkaSender(val kafkaTemplate: KafkaTemplate<String, String>, val applicationProperties: ApplicationProperties) {
@Transactional(transactionManager = "kafkaTransactionManager")
fun sendToKafka(message: String, stringKey: String) {
kafkaTemplate.executeInTransaction { kt ->
kt.send(applicationProperties.kafka.topic, System.currentTimeMillis().mod(10).toInt(), System.currentTimeMillis().rem(10).toString(),
message)
}
}
companion object {
val log = LoggerFactory.getLogger(KafkaSender::class.java)!!
}
}
Поскольку каждый раз, когда я хочу отправить сообщение в Kafka, я создаю новый KafkaSender, я думал, чтобудет создан новый поток, который затем отправит сообщение в очередь kafka.В настоящее время похоже, что пул производителей генерируется, но никогда не очищается, даже если ни один из них не имеет ничего общего.
Это поведение предназначено?
По моему мнению, поведение должно бытьпочти так же, как пул источников данных, поддерживать поток в течение некоторого времени, но когда нечего делать, очистите его.