Я учусь использовать Кафку, используя Spring Kafka в Котлине. Я понял, что когда публикуется новая тема, она создается, если не существует. Поэтому, когда я отправляю значение в новую / старую тему, созданную из Spring, по умолчанию раздел равен 0, но я хочу написать сообщение в другом разделе, например, в разделе 1.
Когда я создаю / пишу втема, это работает:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
Но, когда я выбираю раздел и ключ, используя:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
, я получаю следующую ошибку:
org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
Я попытался изменить ключ на 0.1
, но также не работает. В сущности, когда я создаю тему из Spring-клиента, создается только раздел, и это 0
.
Конфигурация Kafka Producer
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory() : ProducerFactory<String, String> {
val configProps = HashMap<String,Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate() : KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
Итак, как мне создать раздел из клиента Spring Kafka?