java asyn c реализация службы с использованием BlockingQueue - PullRequest
0 голосов
/ 13 июля 2020

Я пытаюсь написать свою собственную реализацию службы Asyn c вместе с моей уже существующей синхронной версией.

Пока что у меня есть следующее:

@Service("asynchronousProcessor")
public class AsynchronousProcessor extends Processor {

   private BlockingQueue<Pair<String, MyRequest>> requestQueue = new LinkedBlockingQueue<>();

    public AsynchronousProcessor(final PBRequestRepository pbRequestRepository,
                                     final JobRunner jobRunner) {
        super(pbRequestRepository, jobRunner);
    }

    @Override
    public MyResponse process(MyRequest request, String id) {
        super.saveTheRequestInDB(request);
        // add task to blocking queue and have it processed in the background
    }

}

В основном у меня есть конечная точка RestController, вызывающий process(). Версия asyn c должна поставить запрос в очередь BlockingQueue и обрабатывать его в фоновом режиме.

Я не уверен, как реализовать этот код для решения этой проблемы. Должен ли я использовать ExecutorService и как лучше всего соответствовать текущему дизайну.

Было бы полезно иметь некоторые элементы управления, например, перед выполнением задачи или после выполнения вызовов задачи.

Любой ответ с примерами кода для демонстрации дизайна был бы действительно полезен :)

1 Ответ

0 голосов
/ 13 июля 2020

Если единственное требование - обрабатывать его асинхронно, я настоятельно рекомендую рассмотреть возможность использования встроенной пружины @ Asyn c для этой цели. Однако использование этого подхода не будет совместимым по интерфейсу с вашим существующим методом процесса Processor, поскольку тип возврата ДОЛЖЕН быть либо недействительным, либо завернутым в тип Future. Это ограничение вызвано уважительными причинами, поскольку выполнение asyn c не может немедленно вернуть ответ, поэтому оболочка Future - единственный способ получить доступ к результату, если он понадобится.

В следующем плане решения указывается, что должно быть сделано для переключения с выполнения syn c на выполнение asyn c при сохранении совместимости интерфейса. Все важные моменты отмечены встроенными комментариями. Обратите внимание, хотя это интерфейс совместим, тип возвращаемого значения - null (по причинам, указанным выше). Если вам ОБЯЗАТЕЛЬНО требуется возвращаемое значение в вашем контроллере, то этот подход (или любой подход asyn c в этом отношении) НЕ будет работать, если вы также не переключитесь на контроллер asyn c (другой topi c с большим количеством шире изменения и дизайн хотя). Следующий план также включает хуки до и после выполнения.

/**
 * Base interface extracted from existing Processor.
 * Use this interfae as injection type in the controller along 
 * with @Qualifier("synchProcessor") for using sync processor.
 * Once ready, switch the Qualifier to asynchronousProcessor
 * to start using async instead.
 */
public interface BaseProcessor {
    public MyResponse process(MyRequest request, String id);
}

@Service("synchProcessor")
@Primary
public class Processor implements BaseProcessor {
    @Override
    public MyResponse process(MyRequest request, String id) {
        // normal existing sync logic
    }
}

@Service("asynchronousProcessor")
public class AsynchronousProcessor implements BaseProcessor {
    @Autowired
    private AsynchQueue queue;
    
        
    public MyResponse process(MyRequest request, String id) {
        queue.process(request,id);
        // async execution can not return result immediately
        // this is a hack to have this implementation interface 
        // compatible with existing BaseProcessor
        return null; 
    }
}

@Component
public class AsynchQueue {
    @Autowired
    @Qualifier("synchProcessor")
    private BaseProcessor processor;

    /**
     * This method will be scheduled by spring scheduler and executd 
     * asynchronously using an executor. Presented outline will
     * call preProcess and postProcess methods before actual method
     * execution. Actual method execution is delegated to existing
     * synchProcessor resuing it 100% AS-IS.
     */
    @Override
    @Async
    public void process(MyRequest request, String id) {
        preProcess(request, id);
        MyResponse response = processor.process(request, id);
        postProcess(request, id, response);
    }
    
    private void preProcess(MyRequest request, String id) {
        // add logic for pre processing here
    }
    
    private void postProcess(MyRequest request, String id, MyResponse response) {
        // add logic for post processing here
    }

}

Другой вариант использования может заключаться в пакетной обработке обновлений базы данных вместо их обработки по одному, как вы это уже делаете. Это особенно полезно, если у вас большой объем и обновления базы данных становятся узким местом. В этом случае имеет смысл использовать BlockingQueue. Ниже приводится схема решения, которое вы можете использовать для этой цели. Опять же, хотя это интерфейс совместим, возвращаемый тип по-прежнему имеет значение null. Вы можете дополнительно настроить этот контур, чтобы иметь несколько потоков обработки (или весенний исполнитель, если на то пошло), если это необходимо для пакетной обработки. Для одного аналогичного варианта использования для моих нужд было достаточно одного потока обработки с пакетными обновлениями, одновременные обновления БД представляли более серьезные проблемы из-за блокировок уровня БД при одновременном выполнении.

public class MyRequestAndID {
    private MyRequest request;
    prviate String id;
    
    public MyRequestAndID(MyRequest request, String id){
        this.request = request;
        this.id = id;
    }
    
    public MyRequest getMyRequest() {
        return this.request;
    }
    
    public String MyId() {
        return this.id;
    }
}

@Service("asynchronousProcessor")
public class BatchProcessorQueue implements BaseProcessor{
    /* Batch processor which can process one OR more items using a single DB query */
    @Autowired
    private BatchProcessor batchProcessor;

    private LinkedBlockingQueue<MyRequestAndID> inQueue = new LinkedBlockingQueue<>();

    private Set<MyRequestAndID> processingSet = new HashSet<>();

    @PostConstruct
    private void init() {
        Thread processingThread = new Thread(() -> processQueue());
        processingThread.setName("BatchProcessor");
        processingThread.start();
    }
    
    public MyResponse process(MyRequest request, String id) {
        enqueu(new MyRequestAndID(request, id));
        // async execution can not return result immediately
        // this is a hack to have this implementation interface 
        // compatible with existing BaseProcessor
        return null; 
    }

    public void enqueu(MyRequestAndID job) {
        inQueue.add(job);
    }

    private void processQueue() {
        try {
            while (true) {
                processQueueCycle();
            }
        } catch (InterruptedException ioex) {
            logger.error("Interrupted while processing queue", ioex);
        }
    }

    private void processQueueCycle() throws InterruptedException {
        // blocking call, wait for at least one item
        MyRequestAndID job = inQueue.take();
        processingSet.add(job);
        updateSetFromQueue();
        processSet();
    }

    private void processSet() {
        if (processingSet.size() < 1)
            return;
        int qSize = processingSet.size();
        preProcess(processingSet)
        batchProcessor.processAll(processingSet);
        postProcess(processingSet)
        processingSet.clear();
    }

    private void updateSetFromQueue() {
        List<MyRequestAndID> inData = Arrays.asList(inQueue.toArray(new MyRequestAndID[0]));
        if (inData.size() < 1)
            return;
        inQueue.removeAll(inData);
        processingSet.addAll(inData);
    }
    
    private void preProcess(Set<MyRequestAndID> currentSet) {
        // add logic for pre processing here
    }
    
    private void postProcess(Set<MyRequestAndID> currentSet) {
        // add logic for post processing here
    }
}
...