Я пытаюсь выяснить, есть ли способ использовать функцию транзакции Kafka для записи в две темы внутри транзакции.
Я знаю, что типичный сценарий использования транзакций Kafka заключается в шаблоне потребитель-производитель.и это кажется хорошо документированным.
То, что я пробовал:
- создал
KafkaTransactionManager
для темы - настроил каждую
ProducerFactory
для использования их соответствующей транзакцииmanager - Создан
ChainedTransactionManger
с двумя экземплярами KafkaTransactionManager
Создан KafkaTemplate
по теме
Затем я использовал аннотацию @Transactional(transactionManager = "chainedTx")
на метод, который это делает:
template1.send("topic1", "example payload");
template2.send("topic2", "example payload");
Это не работает.KafkaTemplate
является транзакционным, но когда вызывается метод send()
, транзакция не выполняется, и я получаю IllegalStateException
.
Я собирался попробовать метод KafkaTemplate.executeInTransaction()
, ноJavadoc заявляет, что это только для локальных транзакций, поэтому он не соответствует моим потребностям.
Мой следующий шаг - попытаться напрямую использовать API производителя Kafka, чтобы посмотреть, работает ли этот шаблон, но я был бы признателен, если быкто-то может сказать мне, что я трачу свое время, и Кафка не поддерживает транзакционную запись в несколько тем.
Я нашел это утверждение в блоге Confluent по поддержке транзакций Кафки:
Транзакции включают атомарную запись в несколько тем и разделов Kafka ...
Но я не нашел примеров, демонстрирующих это.
Конфигурация первого производителя
@ Конфигурация открытого класса ControlProducerConfig {
@Bean("controlTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("controlTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
Конфигурация второго производителя
@Configuration
public class PayloadProducerConfig {
@Bean("payloadTransactionManager")
KafkaTransactionManager<String, String> transactionManager() {
return new KafkaTransactionManager<>(factory());
}
@Bean("payloadTemplate")
public KafkaTemplate<String, String> template() {
return new KafkaTemplate<>(factory());
}
private ProducerFactory<String, String> factory() {
DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(config());
factory.setTransactionIdPrefix("abcd");
return factory;
}
private Map<String, Object> config() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx.xxx.xxx.xxx");
props.put("schema.registry.url", "http://xxx.xxx.xxx.xxx/");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
// you can't set idempotence without setting max in flight requests to <= 5
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "1234");
return props;
}
}
Основной класс
@EnableTransactionManagement
@SpringBootApplication
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
@Bean("chainedTx")
public ChainedTransactionManager chained(
@Qualifier("controlTransactionManager") KafkaTransactionManager controlTransactionManager,
@Qualifier("payloadTransactionManager") KafkaTransactionManager payloadTransactionManager) {
return new ChainedTransactionManager(controlTransactionManager, payloadTransactionManager);
}
@Bean OnStart onStart(PostTwoMessages postTwoMessages) {
return new OnStart(postTwoMessages);
}
@Bean
public PostTwoMessages postTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("controlTemplate") KafkaTemplate<String, String> payloadTemplate) {
return new PostTwoMessages(controlTemplate, payloadTemplate);
}
}
При запуске приложения
public class OnStart implements ApplicationListener<ApplicationReadyEvent> {
private PostTwoMessages postTwoMessages;
public OnStart(PostTwoMessages postTwoMessages) {
this.postTwoMessages = postTwoMessages;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
postTwoMessages.run();
}
}
Отправка двух сообщений
public class PostTwoMessages {
private final KafkaTemplate<String, String> controlTemplate;
private final KafkaTemplate<String, String> payloadTemplate;
public PostTwoMessages(
@Qualifier("controlTemplate") KafkaTemplate<String, String> controlTemplate,
@Qualifier("payloadTemplate") KafkaTemplate<String, String> payloadTemplate) {
this.controlTemplate = controlTemplate;
this.payloadTemplate = payloadTemplate;
}
@Transactional(transactionManager = "chainedTx")
public void run() {
UUID uuid = UUID.randomUUID();
controlTemplate.send("private.s0869y.trx.model3a", "control: " + uuid);
payloadTemplate.send("private.s0869y.trx.model3b", "payload: " + uuid);
}
}