Транзакция в Spring Cloud Stream - PullRequest
0 голосов
/ 16 мая 2018

Задача : Я пытаюсь прочитать большой файл построчно и помещаю сообщение в RabbitMQ. Я хочу зафиксировать rabbitMQ в конце файла. Если какая-либо запись в файле неверна, я хочу отозвать сообщения, опубликованные в очереди.

Технологии : Весенний ботинок, Весенний облачный поток, RabbitMQ

Не могли бы вы помочь мне в реализации этого переходного материала. Я знаю, как читать файлы и публиковать их в очереди с помощью весеннего облачного потока.

Edit:

  @Transactional
  public void sendToQueue(List<Data> dataList) {

      for(Data data:dataList)
      {
          this.output.send(MessageBuilder.withPayload(data).build());
          counter++; // I can see message getting published in the queue though management plugin
      }
      LOGGER.debug("message sent to Q2");

  }

Вот мой конфиг:

spring: 
   cloud:    
      stream:
        bindings:
           # Q1 input channel
           tpi_q1_input:
            destination: TPI_Q1
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 output channel  
           tpi_q2_output:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService
            # Q2 input channel
           tpi_q2_input:
            destination: TPI_Q2
            binder: local_rabbit
            content-type: application/json
            group: TPIService     
        binders:
          local_rabbit:
            type: rabbit
            environment:
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
                  virtual-host: /
          rabbit:
            bindings:
                  tpi_q2_output:
                    producer:
                          #autoBindDlq: true
                          transacted: true
                          #batchingEnabled: true
                  tpi_q2_input:  
                   consumer:
                        acknowledgeMode: AUTO
                        #autoBindDlq: true
                        #recoveryInterval: 5000
                        transacted: true       

spring.cloud.stream.default-binder: local_rabbit

Java config

@EnableTransactionManagement
public class QueueConfig {

  @Bean
  public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
    return new RabbitTransactionManager(cf);
  }
}

Приемник

@StreamListener(JmsQueueConstants.QUEUE_2_INPUT)
  @Transactional
  public void receiveMesssage(Data data) {

    logger.info("Message Received in Q2:");
  }

Q2 is the queue getting published in transation

1 Ответ

0 голосов
/ 16 мая 2018
  1. Настройка производителя на использование транзакций ...producer.transacted=true

  2. Публикация сообщений в рамках транзакции (с использованием RabbitTransactionManager).

Используйте обычные механизмы транзакций Spring для # 2 (@Transacted аннотация или TransactionTemplate).

Транзакция будет зафиксирована, если вы выйдете нормально, или откатится, если вы сгенерируете исключение.

EDIT

* * Пример тысяча двадцать-один: * * 1 022
@SpringBootApplication
@EnableBinding(Source.class)
@EnableTransactionManagement
public class So50372319Application {

    public static void main(String[] args) {
        SpringApplication.run(So50372319Application.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(MessageChannel output, RabbitTemplate template, AmqpAdmin admin,
            TransactionalSender sender) {
        admin.deleteQueue("so50372319.group");
        admin.declareQueue(new Queue("so50372319.group"));
        admin.declareBinding(new Binding("so50372319.group", DestinationType.QUEUE, "output", "#", null));
        return args -> {
            sender.send("foo", "bar");
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            System.out.println("Received: " + new String(template.receive("so50372319.group", 10_000).getBody()));
            try {
                sender.send("baz", "qux");
            }
            catch (RuntimeException e) {
                System.out.println(e.getMessage());
            }
            System.out.println("Received: " + template.receive("so50372319.group", 3_000));
        };
    }

    @Bean
    public RabbitTransactionManager transactionManager(ConnectionFactory cf) {
        return new RabbitTransactionManager(cf);
    }

}

@Component
class TransactionalSender {

    private final MessageChannel output;

    public TransactionalSender(MessageChannel output) {
        this.output = output;
    }

    @Transactional
    public void send(String... data) {
        for (String datum : data) {
            this.output.send(new GenericMessage<>(datum));
            if ("qux".equals(datum)) {
                throw new RuntimeException("fail");
            }
        }
    }

}

и

spring.cloud.stream.bindings.output.destination=output
spring.cloud.stream.rabbit.bindings.output.producer.transacted=true

и

Received: foo
Received: bar
fail
Received: null
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...