Как читать файл с помощью Apache Camel, обрабатывать файл с помощью пакета Spring, читать каждую строку и направлять его обратно через Apache Camel - PullRequest
0 голосов
/ 25 апреля 2018

Хотелось бы узнать, как передать элемент JAXBElement в маршрут Camel, который обрабатывается из каждой строки пакетного файла, считываемого через пакет Spring, загруженный через маршрут Camel.

Приведенные ниже фрагменты кода используют метод customerWriter для вызова JMSTemplate для записи сообщения в очередь. Вместо этого мне нужно направить сообщение на другой верблюжий маршрут.

Текущий: CamelRoute -> ReadFile -> Spring Batch -> Обработка каждой строки -> Очередь

Ожидается: CamelRoute -> ReadFile -> Spring Batch -> Обработка каждой строки -> Верблюжий маршрут

Верблюжий маршрут, чтобы прочитать файл:

@Override
public void configure()  {

    String fromUri = batchLoadPath + "?" + batchFileOptions;
    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            File file = msg.getIn().getBody(File.class);
            String fileName = file.getAbsolutePath();

            try {
                JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
                jobLauncher.run(importCustomerJob, jobParameters);
            } catch (Exception e) {
                log.error(Process file encountered error:" + e.getMessage(), e);
            }
        }

    })
    .to("log:EndBatch");

Конфигурация партии:

@Bean
public JmsItemWriter<String> customerWriter() {
    JmsItemWriter<String> writer = new JmsItemWriter<String>();
    writer.setJmsTemplate(jmsTemplate);
    return writer;
}

public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
    JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
    builder.incrementer(new RunIdIncrementer());
    builder.listener(listener);
    JobFlowBuilder jfb = builder.flow(step1);
    jfb.end();
    Job job = jfb.build().build();
    return job;
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
    // Read chunk of 10 records and writing those as messages to queue
    return stepBuilderFactory.get("step1")
            .<Customer, String>chunk(10)
            .reader(customerReader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(customerItemProcessor())
            .writer(customerWriter())
            .listener(customerReader())
            .build();
}

Пакетный процессор:

public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
    @Autowired
    JaxbUtil jaxbUtil;

    public String process(Customer item) throws Exception {
        // Mapping code goes here
        JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
        return jaxbUtil.objectToXml(mobj);
    }
}

Ответы [ 2 ]

0 голосов
/ 04 мая 2018

Спасибо за ваши предложения @Burki и @Roman Vottner. Вот код, который я модифицировал, и он работал.

Решение:

Добавлен метод записи в Batch Config и вызван вместо JMSWriter

@Bean
public CamelItemWriter<String> customerCamelWriter() {
    ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
    CamelItemWriter<String> writer = new CamelItemWriter<String>(producerTemplate, "direct:process");
    return writer;
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
    // Read chunk of 10 records and writing those as messages to CAS.TRF.MDM queue
    return stepBuilderFactory.get("step1")
            .<Customer, String>chunk(10)
            .reader(customerReader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(customerItemProcessor())
            .writer(customerCamelWriter())
            .listener(customerReader())
            .build();
}
0 голосов
/ 25 апреля 2018

Что ж, с точки зрения верблюда, чтобы вызвать другой маршрут верблюда, вы просто добавляете оператор .to(). Например, для синхронного вызова маршрута в памяти вы можете использовать direct:.

from(fromUri).process(new Processor() {
    public void process(Exchange msg)  {
        ... your processor impl
    }
})
.to("direct:yourOtherRoute")
.to("log:EndBatch"); 

from("direct:yourOtherRoute")
...

Чтобы передать результат процессора на следующий маршрут, процессор должен установить этот результат в теле Exchange.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...