Метод stream (ItemStream) в типе AbstractTaskletStepBuilder > неприменимо для аргументов ( - PullRequest
1 голос
/ 16 июня 2020

Как классифицировать элементы с помощью Spring Batch? Я хочу записать данные в две разные таблицы или файлы, пока делаю это в консоли.

Ошибка:

Поток метода (ItemStream) в типе AbstractTaskletStepBuilder> неприменим для аргументов (ItemWriter)

@EnableBatchProcessing
@SpringBootApplication
public class ClassifierCompositeItemApplication {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Value("classpath:input/customer.csv")
    private Resource inputResource;

    public ClassifierCompositeItemApplication(JobBuilderFactory jobs, StepBuilderFactory steps) {
        this.jobBuilderFactory = jobs;
        this.stepBuilderFactory = steps;
    }

    @Bean
    @StepScope
    public FlatFileItemReader<Customer> classifierCompositeWriterItemReader() {
        return new FlatFileItemReaderBuilder<Customer>()
                .name("customerFileReader")
                .resource(inputResource)
                .delimited()
                .names(new String[] { "firstName", "middleInitial", "lastName", "address", "city", "state", "zip" })
                .targetType(Customer.class)
                .build();
    }

    @Bean
    public ClassifierCompositeItemWriter<Customer> compositeItemWriter() throws IOException {
        final Classifier<Customer, ItemStreamWriter<? super Customer>> classifier = new CustomerClassifier(
                this.customer1(), this.customer2());

        return new ClassifierCompositeItemWriterBuilder<Customer>()
                .classifier(classifier)
                .build();
    }

    @Bean
    @StepScope
    public ItemStreamWriter<Customer> customer1() throws IOException {
        System.out.println("Customer #1");
        return new ItemStreamWriter<Customer>() {

            @Override
            public void open(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void update(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void close() throws ItemStreamException {

            }

            @Override
            public void write(List<? extends Customer> items) throws Exception {
                for (Customer customer : items) {
                    System.out.println(customer);
                }
            }
        };
    }

    @Bean
    public ItemStreamWriter<NewCustomer> customer2() {
        System.out.println("Customer #2");
        return new ItemStreamWriter<NewCustomer>() {

            @Override
            public void open(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void update(ExecutionContext executionContext) throws ItemStreamException {

            }

            @Override
            public void close() throws ItemStreamException {

            }

            @Override
            public void write(List<? extends NewCustomer> items) throws Exception {
                for (NewCustomer customer : items) {
                    System.out.println(customer);
                }
            }
        };
    }


    @Bean
    public Step classifierCompositeWriterStep() throws IOException {
        return this.stepBuilderFactory.get("compositeWriterStep")
                .<Customer, Customer>chunk(10)
                .reader(this.classifierCompositeWriterItemReader())
                .writer(this.compositeItemWriter())
                .stream(this.customer1())
                .stream(this.customer2())
                .build();

    }


    @Bean
    public Job classifierCompositeWriterJob() throws IOException {
        return this.jobBuilderFactory.get("compositeWriterJob")
                .start(this.classifierCompositeWriterStep())
                .build();
    }


    public static void main(String[] args) {
        SpringApplication.run(ClassifierCompositeItemApplication.class, args);
    }
}

CustomerClassifier. java

@Data
public class CustomerClassifier implements Classifier<Customer, ItemStreamWriter<? super Customer>> {
    private static final long serialVersionUID = 1L;

    private final ItemStreamWriter<Customer> fileItemWriter;
    private final ItemStreamWriter<Customer> jdbcItemWriter;

    @Override
    public ItemStreamWriter<? super Customer> classify(Customer customer) {
        if (customer.getState().matches("^[A-M].*")) {
            return fileItemWriter;
        } else {
            return jdbcItemWriter;
        }
    }
}

Customer. java

@Data
public class Customer implements Serializable {

    private String firstName;
    private String middleInitial;
    private String lastName;
    private String address;
    private String city;
    private String state;
    private String zip;

    @Override
    public String toString() {
        return "Customer{" + ", firstName='" + firstName + '\'' + ", middleInitial='" + middleInitial + '\''
                + ", lastName='" + lastName + '\'' + ", address='" + address + '\'' + ", city='" + city + '\''
                + ", state='" + state + '\'' + ", zip='" + zip + '\'' + '}';
    }
}

Схема . sql

CREATE TABLE springbatch.TBL_CUSTOMER_WRITER (
    firstname varchar NULL,
    middleinitial varchar NULL,
    lastname varchar NULL,
    address varchar NULL,
    city varchar NULL,
    state varchar NULL,
    zipcode varchar NULL
)
WITH (
    OIDS=FALSE
) ;

Customer.csv

Richard,N,Darrow,5570 Isabella Ave,St. Louis,IL,58540
Barack,G,Donnelly,7844 S. Greenwood Ave,Houston,CA,38635
Ann,Z,Benes,2447 S. Greenwood Ave,Las Vegas,NY,55366
Laura,9S,Minella,8177 4th Street,Dallas,FL,04119
Erica,Z,Gates,3141 Farnam Street,Omaha,CA,57640
Warren,L,Darrow,4686 Mt. Lee Drive,St. Louis,NY,94935
Warren,M,Williams,6670 S. Greenwood Ave,Hollywood,FL,37288
Harry,T,Smith,3273 Isabella Ave,Houston,FL,97261
Steve,O,James,8407 Infinite Loop Drive,Las Vegas,WA,90520
Erica,Z,Neuberger,513 S. Greenwood Ave,Miami,IL,12778
Aimee,C,Hoover,7341 Vel Avenue,Mobile,AL,35928
Jonas,U,Gilbert,8852 In St.,Saint Paul,MN,57321
Regan,M,Darrow,4851 Nec Av.,Gulfport,MS,33193
Stuart,K,Mckenzie,5529 Orci Av.,Nampa,ID,18562
Sydnee,N,Robinson,894 Ornare. Ave,Olathe,KS,25606

1 Ответ

1 голос
/ 17 июня 2020

Здесь вы регистрируете составителей делегированных элементов в виде потоков:

.stream(this.customer1())
.stream(this.customer2())

Но это не ItemStream s. Вам нужно изменить тип возвращаемого значения методов customer1() и customer2() на ItemStreamWriter.

EDIT: Добавьте пример:

import java.util.Arrays;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ClassifierCompositeItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.batch.item.support.builder.ClassifierCompositeItemWriterBuilder;
import org.springframework.classify.Classifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public ItemReader<Customer> itemReader() {
        return new ListItemReader<>(Arrays.asList(new Customer("foo"), new Customer("bar")));
    }

    @Bean
    public ItemWriter<Customer> fooWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println("foo writer: item " + item.name);
            }
        };
    }

    @Bean
    public ItemWriter<Customer> barWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println("bar writer: item " + item.name);
            }
        };
    }

    @Bean
    public ClassifierCompositeItemWriter<Customer> classifierCompositeItemWriter() {
        final Classifier<Customer, ItemWriter<? super Customer>> classifier = 
                new CustomerClassifier(this.fooWriter(), this.barWriter());

        return new ClassifierCompositeItemWriterBuilder<Customer>()
                .classifier(classifier)
                .build();
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Customer, Customer>chunk(5)
                        .reader(itemReader())
                        .writer(classifierCompositeItemWriter())
                        .build())
                .build();
    }

    public static void main(String[] args) throws Exception {
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

    static class Customer {
        String name;

        public Customer(String name) {
            this.name = name;
        }
    }

    static class CustomerClassifier implements Classifier<Customer, ItemWriter<? super Customer>> {

        private ItemWriter<? super Customer> fooItemWriter;
        private ItemWriter<? super Customer> barItemWriter;

        public CustomerClassifier(ItemWriter<? super Customer> fooItemWriter, ItemWriter<? super Customer> barItemWriter) {
            this.fooItemWriter = fooItemWriter;
            this.barItemWriter = barItemWriter;
        }

        @Override
        public ItemWriter<? super Customer> classify(Customer customer) {
            return customer.name.startsWith("f") ? fooItemWriter : barItemWriter;
        }
    }

}

Это печатает:

foo writer: item foo
bar writer: item bar
...