У меня есть транзакционный и обычный Producer в приложении, которое выполняет запись в topi c kafka-topi c, как показано ниже.
Конфигурация для транзакционного Kafka Producer
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
/*The amount of time to wait before attempting to retry a failed request to a given topic partition.
* This avoids repeatedly sending requests in a tight loop under some failure scenarios.*/
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 3);
/*"The configuration controls the maximum amount of time the client will wait "
"for the response of a request. If the response is not received before the timeout "
"elapses the client will resend the request if necessary or fail the request if "
"retries are exhausted.";.*/
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 1);
/*To avoid duplicate msg*/
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
/*Will wait for ack from broker n all replicas*/
props.put(ProducerConfig.ACKS_CONFIG, "all");
/*Kafka Transactional Properties */
props.put(ProducerConfig.CLIENT_ID_CONFIG, "transactional-producer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-transactional-id"); // set transaction id
return props;
}
@Bean
public KafkaProducer<String, String> kafkaProducer() {
return new KafkaProducer<>(producerConfigs());
}
Normal Producer конфиг одинаков только сообщения от подписавшегося топи c. Но потребляет ли он транзакционные и нетранзакционные сообщения от topi c. Не хватает ли мне какой-либо конфигурации, чтобы потребитель получал только транзакционные сообщения от подписанных topi c. Заранее спасибо: -)