Spring Batch ItemReader + RabbitMQ - «очередь» не указана - PullRequest
0 голосов
/ 02 марта 2020

У меня запущено 2 службы: одна, которая подключается к БД и отправляет данные брокеру сообщений, а другая должна принять сообщение от кролика и отправить его через пакетную запись на targetDB. У меня одна и та же Конфигурация Rabbit в каждой службе, но по какой-то причине я получаю:

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate.
    at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
    at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]

2020-03-01 19:50:36.157  INFO 1748 --- [  restartedMain] o.s.batch.core.step.AbstractStep         : Step: [step1] executed in 80ms
2020-03-01 19:50:36.183  INFO 1748 --- [  restartedMain] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=importClientJob]] completed with the following parameters: [{run.id=19}] and the following status: [FAILED] in 142ms

Класс конфигурации:

@Configuration
public class RabbitConfiguration {
    public static final String MESSAGE_EXCHANGE = "clients-exchange";
    public static final String MESSAGE_QUEUE = "clients-queue";
    public static final String MESSAGE_ROUTING_KEY = "clients.msg";

    private final ConnectionFactory connectionFactory;

    public RabbitConfiguration(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    @Bean
    Queue queue() {
        return new Queue(MESSAGE_QUEUE, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(MESSAGE_EXCHANGE);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(MESSAGE_ROUTING_KEY);
    }

    @Bean
    RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
        rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
        rabbitTemplate.setTaskExecutor(taskExecutor);
        return rabbitTemplate;
    }
}

И BatchConfiguration:

@Configuration
public class BatchConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value("${pusher.spring-batch-chunk-size}")
    private int chunkSize;

    private DataSource dataSource;
    private RabbitTemplate rabbitTemplate;
    private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
    private ClientPreparedStatementSetter clientPreparedStatementSetter;

    @Autowired
    public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, DataSource dataSource, RabbitTemplate rabbitTemplate, NamedParameterJdbcTemplate namedParameterJdbcTemplate, ClientPreparedStatementSetter clientPreparedStatementSetter) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.dataSource = dataSource;
        this.rabbitTemplate = rabbitTemplate;
        this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
        this.clientPreparedStatementSetter = clientPreparedStatementSetter;
    }

    @Bean
    public JdbcBatchItemWriter<Client> cursorItemWriter() {
        return new JdbcBatchItemWriterBuilder<Client>()
                .dataSource(this.dataSource)
                .namedParametersJdbcTemplate(namedParameterJdbcTemplate)
                .itemPreparedStatementSetter(clientPreparedStatementSetter)
                .sql("INSERT INTO CLIENT (id, firstname, lastname, email, phone) VALUES (?,?,?,?,?)")
                .build();
    }

    @Bean
    public AmqpItemReader<Client> clientAmqpItemReader() {
        return new AmqpItemReaderBuilder<Client>()
                .amqpTemplate(rabbitTemplate)
                .build();
    }

    @Bean
    public ClientLowerCaseProcessor lowerCaseProcessor() {
        return new ClientLowerCaseProcessor();
    }

    @Bean
    public Job importClientJob(Step step1) {
        return jobBuilderFactory.get("importClientJob")
                .incrementer(new RunIdIncrementer())
                .flow(step1)
                .end()
                .build();
    }

    @Bean
    public Step step1(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
        return stepBuilderFactory.get("step1")
                .<Client, Client>chunk(chunkSize)
                .reader(clientAmqpItemReader())
                .processor(lowerCaseProcessor())
                .writer(cursorItemWriter())
                .taskExecutor(taskExecutor)
                .build();
    }
}

I попытался удалить исполнитель задач и возиться с конфигурацией, но безуспешно

1 Ответ

1 голос
/ 02 марта 2020

org.springframework.amqp.AmqpIllegalStateException: «очередь» не указана. Проверьте конфигурацию RabbitTemplate. в org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue (RabbitTemplate. java: 2410) ~ [spring-rabbit-2.2.2.RELEASE.jar: 2.2.2.RELEASE] в org.springframework.amqp. rabbit.core.RabbitTemplate.receiveAndConvert (RabbitTemplate. java: 1203) ~ [spring-rabbit-2.2.2.RELEASE.jar: 2.2.2.RELEASE] в org.springframework.batch.item.amqp.AmqpItemReader.read (AmqpItemReader. java: 57) ~ [spring-batch -structure-4.2.1.RELEASE.jar: 4.2.1.RELEASE]

AmqpItemReader в подпружиненных пакетах использует RabbitTemplate#receive() для получения сообщений от RabbitMQ, и для этого требуется установить defaultReceiveQueue в RabbitTemplate, чтобы указать, в какую очередь получать сообщения, но вы не настроили ее. Таким образом, вы должны указать имя очереди при настройке RabbitTemplate:

@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
    rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
    rabbitTemplate.setDefaultReceiveQueue("someQueue");
    rabbitTemplate.setTaskExecutor(taskExecutor);
    return rabbitTemplate;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...