Ошибка при создании темы с выбранным разделом на Spring Kafka - PullRequest
0 голосов
/ 03 октября 2019

Я учусь использовать Кафку, используя Spring Kafka в Котлине. Я понял, что когда публикуется новая тема, она создается, если не существует. Поэтому, когда я отправляю значение в новую / старую тему, созданную из Spring, по умолчанию раздел равен 0, но я хочу написать сообщение в другом разделе, например, в разделе 1.

Когда я создаю / пишу втема, это работает:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

Но, когда я выбираю раздел и ключ, используя:

val topicTesteKotlin = "topico-teste-kotlin"

fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
        val msg = Optional.of(message)
        return if (msg.isPresent) {
            kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
                println("Sent message=[" + message +
                        "] with offset=[" + it!!.recordMetadata.offset() + "]")
            }, {
                println("Unable to send message=["
                        + message + "] due to : " + it.message)
            })
            ResponseEntity.ok(msg.get())
        } else {
            kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
            ResponseEntity.badRequest().body("Bad request!")
        }
    }

, я получаю следующую ошибку:

org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).

Я попытался изменить ключ на 0.1, но также не работает. В сущности, когда я создаю тему из Spring-клиента, создается только раздел, и это 0.

Конфигурация Kafka Producer

@Configuration
class KafkaProducerConfig {

    @Bean
    fun producerFactory() : ProducerFactory<String, String> {
        val configProps = HashMap<String,Any>()
        configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
        configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
        return DefaultKafkaProducerFactory(configProps)
    }

    @Bean
    fun kafkaTemplate() : KafkaTemplate<String, String> {
        return KafkaTemplate(producerFactory())
    }

}

Итак, как мне создать раздел из клиента Spring Kafka?

1 Ответ

0 голосов
/ 03 октября 2019

Вы можете управлять механизмом создания темы, используя следующий код:

@Configuration
public class KafkaTopicConfig {
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
    private String testTopicName = "topico-teste-kotlin";

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,       bootstrapAddress);
    return new KafkaAdmin(configs);
    }
   @Bean
    public NewTopic testTopic() {
        // second parameter is a number of partitions
        return new NewTopic(testTopicName, 2, (short) 1);
    }

}
...