Как я могу определить производителя kafka, который будет иметь другой настраиваемый ключ типа и сериализатор значений в соответствии с необходимостью? - PullRequest
1 голос
/ 06 апреля 2019

У меня есть KafkaProducer, я настроил сериализатор ключей и значений, как показано ниже:

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "my custom class");   

Теперь мой вопрос: как определить производителя для сериализатора значений другого типа, например:

    props.put("value.serializer", "my custom class1");
    props.put("value.serializer", "my custom class2");

Для лучшей производительности лучше инициализировать одного производителя кафки только через приложение. Итак, как я могу определить производителя kafka, который будет иметь разные настраиваемый ключ типа и сериализатор значений в соответствии с необходимостью?

Ответы [ 2 ]

2 голосов
/ 06 апреля 2019

Вы не можете иметь более одного сериализатора на экземпляр производителя.

Если у вас несколько сериализаторов, вам нужно несколько свойств и, следовательно, несколько сконфигурированных экземпляров производителя.


Однако это не значит, что есть и другие способы обойти это. Spring-Kafka предлагает типы сопоставления для JSON, а Confluent поддерживает многотипные схемы Avro в сериализаторах Java.

0 голосов
/ 06 апреля 2019

Мы можем создать несколько kafkaProducer с различными сериализаторами KEY, VALUE, и мы можем обработать эти kafkaProducers

    @Bean
    public  Map<String, Object> producerProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.RETRIES_CONFIG,"2000");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return configProps;
    }

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        return new KafkaProducer<String, String>(producerProperties());
    }

    @Bean
    public  Map<String, Object> producerJSONProperties() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.RETRIES_CONFIG,"2000");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JSONSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JSONSerializer.class);
        return configProps;
    }

    @Bean
    public KafkaProducer<String, String> kafkaJSONProducer() {
        return new KafkaProducer<String, String>(producerJSONProperties());
    }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...