Я написал весеннее пакетное задание, которое обрабатывает список списков.
Reader возвращает список списков.Процессор работает с каждым ListItem и возвращает обработанный List.Writer записывает данные в DB и sftp из списка List.
У меня есть случай, когда я вызываю Async REST api из весеннего пакетного процессора.В ответе ListenableFuture я реализовал LitenableFutureCallback для обработки успеха и неудачи, который работает, как и ожидалось, но до того, как асинхронный вызов возвращает что-то, ItemProcessor не ждет ответных вызовов из async api и возвращает объект (List) в писатель.
Я не уверен, как реализовать и обрабатывать асинхронные вызовы из ItemProcessor.
Я читал об AsyncItemProcessor и AsyncItemWriter, но я не уверен, что это то, что я должен использовать в этом сценарии.
Я также думал о вызове get () для ответа ListenableFuture из AsyncRestTemplate, но согласно документации он будет блокировать текущий поток, пока не получит ответ.
Я ищу некоторую помощь, как мне это реализовать.Фрагмент кода ниже:
Процессор:
public class MailDocumentProcessor implements ItemProcessor<List<MailingDocsEntity>, List<MailingDocsEntity>> {
... Initialization code
@Override
public List<MailingDocsEntity> process(List<MailingDocsEntity> documentsList) throws Exception {
logger.info("Entering MailingDocsEntity processor");
List<MailingDocsEntity> synchronizedList = Collections.synchronizedList(documentsList);
for (MailingDocsEntity mailingDocsEntity : synchronizedList) {
System.out.println("Reading Mailing id: " + mailingDocsEntity.getMailingId());
..code to get the file
//If the file is not a pdf convert it
String fileExtension = readFromSpResponse.getFileExtension();
String fileName = readFromSpResponse.getFileName();
byte[] fileBytes = readFromSpResponse.getByteArray();
try {
//Do checks to make sure PDF file is being sent
if (!"pdf".equalsIgnoreCase(fileExtension)) {
//Only doc, docx and xlsx conversions are supported
...Building REquest object
//make async call to pdf conversion service
pdfService.convertDocxToPdf(request, mailingDocsEntity);
} else {
logger.error("The file cannot be converted to a pdf.\n"
);
}
}
} catch (Exception ex){
logger.error("There has been an exception while processing data", ex);
}
}
return synchronizedList;
}
}
Класс обслуживания Async PdfConversion:
@Service
public class PdfService{
@Autowired
@Qualifier("MicroServiceAsyncRestTemplate")
AsyncRestTemplate microServiceAsyncRestTemplate;
public ConvertDocxToPdfResponse convertDocxToPdf(ConvertDocxToPdfRequest request, MailingDocsEntity mailingDocsEntity){
ConvertDocxToPdfResponse pdfResponse = new ConvertDocxToPdfResponse();
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
HttpEntity<?> entity = new HttpEntity<>(request, headers);
ListenableFuture<ResponseEntity<ConvertDocxToPdfResponse>> microServiceResponse = microServiceAsyncRestTemplate.postForEntity(batchMailProcessingConfiguration.getPdfUrl(), entity, ConvertDocxToPdfResponse.class);
ConvertDocxToPdfResponse resultBody = microServiceResponse.get().getBody();
microServiceResponse.addCallback(new ListenableFutureCallback<ResponseEntity<ConvertDocxToPdfResponse>>() {
@Override
public void onSuccess(ResponseEntity<ConvertDocxToPdfResponse> result) {
...code to do stuff on success
}
@Override
public void onFailure(Throwable ex) {
pdfResponse.setMessage("Exception while retrieving response");
}
});
} catch (Exception e) {
String message = "There has been an error while issuing a pdf generate request to the pdf micro service";
pdfResponse.setMessage(message);
logger.error(message, e);
}
return pdfResponse;
}
}
Мое исходное пакетное задание было синхронным, я преобразую в Async для более быстрой обработки,Я пытался найти похожие вопросы, но не смог найти достаточно информации.Любые указатели или помощь высоко ценится.
Спасибо !!