Вам необходимо показать свой код и конфигурацию, а также версии, которые вы используете.
Транзакции только для производителя обсуждаются в документации .
Включите транзакции, установив в spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix непустое значение, например, tx-.При использовании в приложении процессора потребитель запускает транзакцию;любые записи, отправленные в потоке потребителя, участвуют в той же транзакции.Когда прослушиватель завершается нормально, контейнер слушателя отправляет смещение транзакции и фиксирует его.Общая фабрика производителей используется для настройки всех привязок производителей с использованием spring.cloud.stream.kafka.binder.transaction.producer. * Properties;индивидуальная привязка Свойства производителя Kafka игнорируются.
Если вы хотите использовать транзакции в исходном приложении или из некоторого произвольного потока для транзакции только для производителя (например, метод @Scheduled), вы должны получить ссылку на транзакциюфабрика производителя и определить с ее помощью bean-компонент KafkaTransactionManager.
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
return new KafkaTransactionManager<>(pf);
}
Обратите внимание, что мы получаем ссылку на подшивку, используя BinderFactory;используйте null в первом аргументе, когда настроен только один связыватель.Если настроено более одного подшивки, используйте имя подшивки для получения ссылки.Получив ссылку на связыватель, мы можем получить ссылку на ProducerFactory и создать менеджер транзакций.
Тогда вы просто поддержите обычную поддержку транзакций Spring, например TransactionTemplate или @Transactional, например:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
Если вы хотите синхронизировать транзакции только для производителя с транзакциями из другого менеджера транзакций, используйте ChainedTransactionManager.