ElasticsearchItemReader продолжает читать те же записи - PullRequest
0 голосов
/ 10 июля 2019

Я действительно новичок в Spring, и мне нужно разработать приложение с использованием Spring-Batch.Это приложение должно считывать из индекса эластичного поиска и записывать все записи в файл.

При запуске программы я не получаю никаких ошибок, и приложение считывает записи и правильно записывает их в файл.,Дело в том, что приложение никогда не останавливается и продолжает чтение, обработку и запись данных без остановки.На следующем рисунке вы можете видеть, что одни и те же записи обрабатываются много раз.

enter image description here

Я думаю, что должна быть какая-то проблема в моем коде или моем дизайнепрограммное обеспечение, поэтому я прилагаю к этому наиболее важные части моего кода.

Я разработал следующий ElasticsearchItemReader:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

private final Logger logger;

private final ElasticsearchOperations elasticsearchOperations;

private final SearchQuery query;

private final Class<? extends T> targetType;

public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends T> targetType) {
    setName(getShortName(getClass()));
    logger = getLogger(getClass());
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
}

@Override
public void afterPropertiesSet() throws Exception {
    state(elasticsearchOperations != null, "An ElasticsearchOperations implementation is required.");
    state(query != null, "A query is required.");
    state(targetType != null, "A target type to convert the input into is required.");
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

    logger.debug("executing query {}", query.getQuery());

    return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}

Также я написал следующий ReadWriterConfig:

@Configuration
public class ReadWriterConfig {

@Bean
public ElasticsearchItemReader<AnotherElement> elasticsearchItemReader() {

    return new ElasticsearchItemReader<>(elasticsearchOperations(), query(), AnotherElement.class);
}


@Bean
public SearchQuery query() {

    NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder()
            .withQuery(matchAllQuery());

    return builder.build();
}

@Bean
public ElasticsearchOperations elasticsearchOperations()  {

    Client client = null;
    try {
        Settings settings = Settings.builder()
                .build();

        client = new PreBuiltTransportClient(settings)
                .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
        return new ElasticsearchTemplate(client);
    } catch (UnknownHostException e) {
        e.printStackTrace();
        return null;
    }


}
}

И я написал пакетную конфигурацию, в которой я звоню читателю, автору и процессору:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

// tag::readerwriterprocessor[]
@Bean
public ElasticsearchItemReader<AnotherElement> reader() {
    return  new ReadWriterConfig().elasticsearchItemReader();
}

@Bean
public PersonItemProcessor processor() {
    return new PersonItemProcessor();
}

@Bean
public FlatFileItemWriter itemWriter() {
    return  new FlatFileItemWriterBuilder<AnotherElement>()
            .name("itemWriter")
            .resource(new FileSystemResource("target/output.txt"))
            .lineAggregator(new PassThroughLineAggregator<>())
            .build();
}

// end::readerwriterprocessor[]

// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step stepA) {
    return jobBuilderFactory.get("importUserJob")
            .flow(stepA)
            .end()
            .build();
}



@Bean
public Step stepA(FlatFileItemWriter<AnotherElement> writer) {
    return stepBuilderFactory.get("stepA")
            .<AnotherElement, AnotherElement> chunk(10)
            .reader(reader())
            .processor(processor())
            .writer(itemWriter())
            .build();
}
// end::jobstep[]

}

Я прикрепляю некоторые веб-сайты, которые я написал, чтобы написать этот код:

https://github.com/spring-projects/spring-batch-extensions/blob/master/spring-batch-elasticsearch/README.md

https://spring.io/guides/gs/batch-processing/

Ответы [ 2 ]

0 голосов
/ 11 июля 2019

Ваш читатель должен возвращать Iterator для каждого вызова doPageRead(), с которым можно выполнить итерации по одной странице набора данных. Поскольку вы не разбиваете результат запроса Elasticsearch на страницы, а запрашиваете весь набор за один шаг, вы возвращаете при первом вызове doPageRead() итератор для всего набора результатов. Затем при следующем вызове вы снова возвращаете итератор для того же набора результатов.

Так что вы должны следить, если вы уже вернули итератор, что-то вроде:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

    // leaving out irrelevant parts

    boolean doPageReadCalled = false;

    @Override
    @SuppressWarnings("unchecked")
    protected Iterator<T> doPageRead() {

        if(doPageReadCalled) {
            return null;
        }

        doPageReadCalled = true

        return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
    }
}

При первом вызове вы устанавливаете флаг true, а затем возвращаете итератор, при следующем вызове вы видите, что вы уже вернули данные и возвращаете null.

Это очень простое решение, в зависимости от объема данных, которые вы получаете от Elasticsearch, может быть лучше запросить, например, с помощью API прокрутки и возврата страниц, пока все не будут обработаны.

0 голосов
/ 11 июля 2019

Вы должны убедиться, что ваш считыватель возвратил null в какой-то момент, чтобы указать, что больше нет данных для обработки и завершения задания.

Как и просили в комментариях, вот пример того, как импортировать читатель:

@Configuration
@org.springframework.context.annotation.Import(ReadWriterConfig.class)
@EnableBatchProcessing
public class BatchConfiguration {

   // other bean definitions

   @Bean
   public Step stepA(ElasticsearchItemReader<AnotherElement> reader, FlatFileItemWriter<AnotherElement> writer) {
      return stepBuilderFactory.get("stepA")
        .<AnotherElement, AnotherElement> chunk(10)
        .reader(reader)
        .processor(processor())
        .writer(writer)
        .build();
   }
}

Надеюсь, это поможет.

...