Весенняя партия имеет средство под названием AsyncItemProcessor
. Он просто оборачивает ItemProcessor
и запускает его с TaskExecutor
, поэтому он может работать асинхронно. Я хочу иметь вызов rest в этом ItemProcessor
, проблема в том, что каждый поток внутри этого TaskExecutor
, который делает вызов rest, будет заблокирован до получения ответа. Я хочу сделать его неблокирующим, что-то вроде реактивной парадигмы.
У меня есть ItemProcessor, который вызывает точку отдыха и получает ответ:
@Bean
public ItemProcessor<String, String> testItemProcessor() {
return item -> {
String url = "http://localhost:8787/test";
try {
// it's a long time process and take a lot of time
String response = restTemplate.exchange(new URI(url), HttpMethod.GET, new RequestEntity(HttpMethod.GET, new URI(url)), String.class).getBody();
return response;
} catch (URISyntaxException e) {
e.printStackTrace();
return null;
}
};
}
Теперь я обертываю это с помощью AsyncItemProcessor
:
@Bean
public AsyncItemProcessor testAsyncItemProcessor() throws Exception {
AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor<>();
asyncItemProcessor.setDelegate(testItemProcessor());
asyncItemProcessor.setTaskExecutor(testThreadPoolTaskExecutor());
asyncItemProcessor.afterPropertiesSet();
return asyncItemProcessor;
}
@Bean
public ThreadPoolTaskExecutor testThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(50);
threadPoolTaskExecutor.setMaxPoolSize(100);
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
return threadPoolTaskExecutor;
}
Я использовал ThreadPoolTaskExecutor
в качестве TaskExecuter
.
Это ItemWriter
:
@Bean
public ItemWriter<String> testItemWriter() {
return items -> {
// I write them to a file and a database, but for simplicity:
for (String item : items) {
System.out.println(item);
}
};
}
@Bean
public AsyncItemWriter asyncTestItemWriter() throws Exception {
AsyncItemWriter asyncItemWriter = new AsyncItemWriter<>();
asyncItemWriter.setDelegate(testItemWriter());
asyncItemWriter.afterPropertiesSet();
return asyncItemWriter;
}
Настройка шага и задания:
@Bean
public Step testStep() throws Exception {
return stepBuilderFactory.get("testStep")
.<String, String>chunk(1000)
.reader(testItemReader())
.processor(testAsyncItemProcessor())
.writer(asyncTestItemWriter())
.build();
}
@Bean
public Job testJob() throws Exception {
return jobBuilderFactory.get("testJob")
.start(testStep())
.build();
}
ItemReader
является простым ListItemReader
:
@Bean
public ItemReader<String> testItemReader() {
List<String> integerList = new ArrayList<>();
for (int i=0; i<10000; i++) {
integerList.add(String.valueOf(i));
}
return new ListItemReader(integerList);
}
Теперь у меня есть ThreadPoolTaskExecutor
с 50 ~ 100 нитями. Каждый поток внутри ItemProcessor
выполняет вызов rest и ожидает / блокирует получение ответа от сервера. Есть ли способ сделать эти вызовы / процесс неблокирующим? Если ответ «да», как я должен проектировать ItemWriter
? Внутри ItemWriter
я хочу записать результаты из ItemProcessor
в файл и базу данных. Каждый блок имеет размер 1000, я могу подождать, пока все записи внутри него не будут обработаны, но я не хочу блокировать поток на каждый вызов rest внутри блока. Есть ли способ выполнить sh это?
Я знаю, что шаблон отдыха Spring - это тот, который делает блокировку процесса и веб-клиент должен использоваться, но есть ли какой-нибудь эквивалентный компонент в весеннем пакете (вместо AsyncItemProcessor / AsyncItemWriter) для веб-клиента?