Kafka Transactional прочитал совершенный Consumer - PullRequest
1 голос
/ 23 марта 2020

У меня есть транзакционный и обычный Producer в приложении, которое выполняет запись в topi c kafka-topi c, как показано ниже.

Конфигурация для транзакционного Kafka Producer

@Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.RETRIES_CONFIG, 5);
        /*The amount of time to wait before attempting to retry a failed request to a given topic partition. 
         * This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
        /*"The configuration controls the maximum amount of time the client will wait "
         "for the response of a request. If the response is not received before the timeout "
         "elapses the client will resend the request if necessary or fail the request if "
         "retries are exhausted.";.*/
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
        /*To avoid duplicate msg*/
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        /*Will wait for ack from broker n all replicas*/
        props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
        props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
        return props;
    }

    @Bean
    public KafkaProducer<String, String> kafkaProducer() {
        return new KafkaProducer<>(producerConfigs());
    }

Normal Producer конфиг одинаков только сообщения от подписавшегося топи c. Но потребляет ли он транзакционные и нетранзакционные сообщения от topi c. Не хватает ли мне какой-либо конфигурации, чтобы потребитель получал только транзакционные сообщения от подписанных topi c. Заранее спасибо: -)

1 Ответ

2 голосов
/ 23 марта 2020

Так не работает. isolation.level относится только к записям, совершенным транзакционными производителями. Все потребители видят записи, опубликованные нетранзакционными производителями.

Вам нужно использовать две разные темы, чтобы получить желаемое поведение.

...