Если единственное требование - обрабатывать его асинхронно, я настоятельно рекомендую рассмотреть возможность использования встроенной пружины @ Asyn c для этой цели. Однако использование этого подхода не будет совместимым по интерфейсу с вашим существующим методом процесса Processor, поскольку тип возврата ДОЛЖЕН быть либо недействительным, либо завернутым в тип Future. Это ограничение вызвано уважительными причинами, поскольку выполнение asyn c не может немедленно вернуть ответ, поэтому оболочка Future - единственный способ получить доступ к результату, если он понадобится.
В следующем плане решения указывается, что должно быть сделано для переключения с выполнения syn c на выполнение asyn c при сохранении совместимости интерфейса. Все важные моменты отмечены встроенными комментариями. Обратите внимание, хотя это интерфейс совместим, тип возвращаемого значения - null (по причинам, указанным выше). Если вам ОБЯЗАТЕЛЬНО требуется возвращаемое значение в вашем контроллере, то этот подход (или любой подход asyn c в этом отношении) НЕ будет работать, если вы также не переключитесь на контроллер asyn c (другой topi c с большим количеством шире изменения и дизайн хотя). Следующий план также включает хуки до и после выполнения.
/**
* Base interface extracted from existing Processor.
* Use this interfae as injection type in the controller along
* with @Qualifier("synchProcessor") for using sync processor.
* Once ready, switch the Qualifier to asynchronousProcessor
* to start using async instead.
*/
public interface BaseProcessor {
public MyResponse process(MyRequest request, String id);
}
@Service("synchProcessor")
@Primary
public class Processor implements BaseProcessor {
@Override
public MyResponse process(MyRequest request, String id) {
// normal existing sync logic
}
}
@Service("asynchronousProcessor")
public class AsynchronousProcessor implements BaseProcessor {
@Autowired
private AsynchQueue queue;
public MyResponse process(MyRequest request, String id) {
queue.process(request,id);
// async execution can not return result immediately
// this is a hack to have this implementation interface
// compatible with existing BaseProcessor
return null;
}
}
@Component
public class AsynchQueue {
@Autowired
@Qualifier("synchProcessor")
private BaseProcessor processor;
/**
* This method will be scheduled by spring scheduler and executd
* asynchronously using an executor. Presented outline will
* call preProcess and postProcess methods before actual method
* execution. Actual method execution is delegated to existing
* synchProcessor resuing it 100% AS-IS.
*/
@Override
@Async
public void process(MyRequest request, String id) {
preProcess(request, id);
MyResponse response = processor.process(request, id);
postProcess(request, id, response);
}
private void preProcess(MyRequest request, String id) {
// add logic for pre processing here
}
private void postProcess(MyRequest request, String id, MyResponse response) {
// add logic for post processing here
}
}
Другой вариант использования может заключаться в пакетной обработке обновлений базы данных вместо их обработки по одному, как вы это уже делаете. Это особенно полезно, если у вас большой объем и обновления базы данных становятся узким местом. В этом случае имеет смысл использовать BlockingQueue. Ниже приводится схема решения, которое вы можете использовать для этой цели. Опять же, хотя это интерфейс совместим, возвращаемый тип по-прежнему имеет значение null. Вы можете дополнительно настроить этот контур, чтобы иметь несколько потоков обработки (или весенний исполнитель, если на то пошло), если это необходимо для пакетной обработки. Для одного аналогичного варианта использования для моих нужд было достаточно одного потока обработки с пакетными обновлениями, одновременные обновления БД представляли более серьезные проблемы из-за блокировок уровня БД при одновременном выполнении.
public class MyRequestAndID {
private MyRequest request;
prviate String id;
public MyRequestAndID(MyRequest request, String id){
this.request = request;
this.id = id;
}
public MyRequest getMyRequest() {
return this.request;
}
public String MyId() {
return this.id;
}
}
@Service("asynchronousProcessor")
public class BatchProcessorQueue implements BaseProcessor{
/* Batch processor which can process one OR more items using a single DB query */
@Autowired
private BatchProcessor batchProcessor;
private LinkedBlockingQueue<MyRequestAndID> inQueue = new LinkedBlockingQueue<>();
private Set<MyRequestAndID> processingSet = new HashSet<>();
@PostConstruct
private void init() {
Thread processingThread = new Thread(() -> processQueue());
processingThread.setName("BatchProcessor");
processingThread.start();
}
public MyResponse process(MyRequest request, String id) {
enqueu(new MyRequestAndID(request, id));
// async execution can not return result immediately
// this is a hack to have this implementation interface
// compatible with existing BaseProcessor
return null;
}
public void enqueu(MyRequestAndID job) {
inQueue.add(job);
}
private void processQueue() {
try {
while (true) {
processQueueCycle();
}
} catch (InterruptedException ioex) {
logger.error("Interrupted while processing queue", ioex);
}
}
private void processQueueCycle() throws InterruptedException {
// blocking call, wait for at least one item
MyRequestAndID job = inQueue.take();
processingSet.add(job);
updateSetFromQueue();
processSet();
}
private void processSet() {
if (processingSet.size() < 1)
return;
int qSize = processingSet.size();
preProcess(processingSet)
batchProcessor.processAll(processingSet);
postProcess(processingSet)
processingSet.clear();
}
private void updateSetFromQueue() {
List<MyRequestAndID> inData = Arrays.asList(inQueue.toArray(new MyRequestAndID[0]));
if (inData.size() < 1)
return;
inQueue.removeAll(inData);
processingSet.addAll(inData);
}
private void preProcess(Set<MyRequestAndID> currentSet) {
// add logic for pre processing here
}
private void postProcess(Set<MyRequestAndID> currentSet) {
// add logic for post processing here
}
}