Spring Kafka транзакция включена, но потребитель все еще получил сообщение об откате - PullRequest
0 голосов
/ 30 октября 2019

Я использую транзакцию Spring kafka для своих приложений производителя и потребителя.

Требование на стороне производителя состоит из нескольких шагов: отправить сообщение kafka, а затем сохранить в db. Если сохранить в db не удалось, нужно откатить и сообщение, отправленное на kafka.

Итак, на стороне потребителя я установил isolation.leve на read_committed, тогда, если сообщение является откатом от kafka, потребительне должен его читать.

Код для приложения Producer:

@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  public ProducerFactory<String, Customer> producerFactory() {
    DefaultKafkaProducerFactory<String, Customer> pf = new DefaultKafkaProducerFactory<>(producerConfigs());
    pf.setTransactionIdPrefix("customer.txn.tx-");
    return pf;
  }

  @Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // create a minimum Producer configs
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "http://127.0.0.1:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
    props.put("schema.registry.url", "http://127.0.0.1:8081");

    // create safe Producer
    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    props.put(ProducerConfig.ACKS_CONFIG, "all");
    props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5"); // kafka 2.0 >= 1.1 so we can keep this as 5. Use 1 otherwise.

    // high throughput producer (at the expense of a bit of latency and CPU usage)
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
    props.put(ProducerConfig.LINGER_MS_CONFIG, "20");
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32 * 1024)); // 32 KB batch size
    return props;
  }

  @Bean
  public KafkaTemplate<String, Customer> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }

  @Bean
  public KafkaTransactionManager kafkaTransactionManager(ProducerFactory<String, Customer> producerFactory) {
    KafkaTransactionManager<String, Customer> ktm = new KafkaTransactionManager<>(producerFactory);
    ktm.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
    return ktm;
  }

  @Bean
  @Primary
  public JpaTransactionManager jpaTransactionManager(EntityManagerFactory entityManagerFactory) {
    return new JpaTransactionManager(entityManagerFactory);
  }

  @Bean(name = "chainedTransactionManager")
  public ChainedTransactionManager chainedTransactionManager(JpaTransactionManager jpaTransactionManager,
                                                             KafkaTransactionManager kafkaTransactionManager) {
    return new ChainedTransactionManager(kafkaTransactionManager, jpaTransactionManager);
  }
}


@Component
@Slf4j
public class KafkaProducerService {

  private KafkaTemplate<String, Customer> kafkaTemplate;
  private CustomerConverter customerConverter;
  private CustomerRepository customerRepository;

  public KafkaProducerService(KafkaTemplate<String, Customer> kafkaTemplate, CustomerConverter customerConverter, CustomerRepository customerRepository) {
    this.kafkaTemplate = kafkaTemplate;
    this.customerConverter = customerConverter;
    this.customerRepository = customerRepository;
  }

  @Transactional(transactionManager = "chainedTransactionManager", rollbackFor = Exception.class)
  public void sendEvents(String topic, CustomerModel customer) {
    LOGGER.info("Sending to Kafka: topic: {}, key: {}, customer: {}", topic, customer.getKey(), customer);
//    kafkaTemplate.send(topic, customer.getKey(), customerConverter.convertToAvro(customer));
    kafkaTemplate.executeInTransaction(kt -> kt.send(topic, customer.getKey(), customerConverter.convertToAvro(customer)));
    customerRepository.saveToDb();
  }
}


Так что я явно выбрасываю исключение в методе saveToDb, и я вижу, как выбрасывается исключение. Но потребительское приложение все еще может видеть сообщение.

Код для потребителя:

@Slf4j
@Configuration
@EnableKafka
public class KafkaConfiguration {

  @Bean
  ConcurrentKafkaListenerContainerFactory<String, Customer> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setAfterRollbackProcessor(new DefaultAfterRollbackProcessor<String, Customer>(-1));


//    SeekToCurrentErrorHandler errorHandler =
//        new SeekToCurrentErrorHandler((record, exception) -> {
//          // recover after 3 failures - e.g. send to a dead-letter topic
////          LOGGER.info("***in error handler data, {}", record);
////          LOGGER.info("***in error handler headers, {}", record.headers());
////          LOGGER.info("value: {}", new String(record.headers().headers("springDeserializerExceptionValue").iterator().next().value()));
//        }, 3);
//
//    factory.setErrorHandler(errorHandler);

    return factory;
  }

  @Bean
  public ConsumerFactory<String, Customer> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }

  @Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
    props.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, KafkaAvroDeserializer.class);

    props.put("schema.registry.url", "http://127.0.0.1:8081");
    props.put("specific.avro.reader", "true");
    props.put("isolation.level", "read_committed");

//    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // disable auto commit of offsets
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100"); // disable auto commit of offsets
    return props;
  }
}

@Component
@Slf4j
public class KafkaConsumerService {

  @KafkaListener(id = "demo-consumer-stream-group", topics = "customer.txn")
  @Transactional
  public void process(ConsumerRecord<String, Customer> record) {
    LOGGER.info("Customer key: {} and value: {}", record.key(), record.value());
    LOGGER.info("topic: {}, partition: {}, offset: {}", record.topic(), record.partition(), record.offset());
  }
}

Я что-то здесь упустил?

1 Ответ

0 голосов
/ 30 октября 2019

executeInTransaction будет выполняться в отдельной транзакции. См. Javadocs:

/**
 * Execute some arbitrary operation(s) on the operations and return the result.
 * The operations are invoked within a local transaction and do not participate
 * in a global transaction (if present).
 * @param callback the callback.
 * @param <T> the result type.
 * @return the result.
 * @since 1.1
 */
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

Просто используйте send() для участия в существующей транзакции.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...