Настройка производителя на использование транзакций ...producer.transacted=true
Публикация сообщений в рамках транзакции (с использованием RabbitTransactionManager).
Используйте обычные механизмы транзакций Spring для # 2 (@Transacted
аннотация или TransactionTemplate
).
Транзакция будет зафиксирована, если вы выйдете нормально, или откатится, если вы сгенерируете исключение.
EDIT
* * Пример тысяча двадцать-один: * * 1 022
@SpringBootApplication
@EnableBinding(Source.class)
@EnableTransactionManagement
public class So50372319Application {
public static void main(String[] args) {
SpringApplication.run(So50372319Application.class, args).close();
}
@Bean
public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
TransactionalSender sender) {
admin.deleteQueue("so50372319.group");
admin.declareQueue(new Queue("so50372319.group"));
admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
return args -> {
sender.send("foo", "bar");
System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
try {
sender.send("baz", "qux");
}
catch (RuntimeException e) {
System.out.println(e.getMessage());
}
System.out.println("Received: " + template.receive("so50372319.group", 3_000));
};
}
@Bean
public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
return new RabbitTransactionManager(cf);
}
}
@Component
class TransactionalSender {
private final MessageChannel output;
public TransactionalSender(MessageChannel output) {
this.output = output;
}
@Transactional
public void send(String... data) {
for (String datum : data) {
this.output.send(new GenericMessage<>(datum));
if ("qux".equals(datum)) {
throw new RuntimeException("fail");
}
}
}
}
и
spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true
и
Received: foo
Received: bar
fail
Received: null