Spring TaskExecutor неограниченная очередь - PullRequest
0 голосов
/ 04 июня 2018

Я реализовал Spring-TaskExecutor (который эквивалентен Executor JDK 1.5) для обработки уведомлений, получаемых из внешних систем.

Интерфейс только с одним методом:

 public interface AsynchronousService {
    void executeAsynchronously(Runnable task);
}

и соответствующая реализация:

public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

Xml-конфигурация исполнителя задачи (унаследованное приложение):

<bean id="taskExecutor" class="org.example.impl.NotificationPool">
        <property name="corePoolSize" value="1"/>
        <property name="maxPoolSize" value="1"/>
        <!--<property name="queueCapacity" value="100"/>-->
        <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

1 задано для обоих, corePoolSize и maxPoolSize, поскольку яхочу, чтобы задачи выполнялись последовательно (в пуле, который обрабатывает задачи, создается только 1 поток).

Я хочу заказать задачу по дате, когда я получил уведомление, поэтому мне нужно переопределить эту функциюЧтобы разрешить упорядочение приоритетов:

public class NotificationPool extends ThreadPoolTaskExecutor {
     @Override
     protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
          return new PriorityBlockingQueue<>(queueCapacity);
        }
    }

Вот класс задач Уведомления:

public class NotificationTask implements Runnable, Comparable<NotificationTask> {

    private final NotificationService notificationService;
    private final Notification notification;

    public NotificationService(NotificationService notificationService, 
                               Notification notification) {
        this.notificationService = notificationService;
        this.notification = notification;
    }

    @Override
    public int compareTo(NotificationTask task) {
        return notification.getTimestamp().compareTo(task.getTimestamp());
    }

    @Override
    public void run() {
        notificationService.processNotification(notification);
    }
}

И вот как я его выполняю:

asynchronousService.executeAsynchronously(new NotificationTask (notificationService, notification));

Это работает нормальнос ограниченной очередью, но мне нужна неограниченная очередь.Как видно из xml-конфигурации, строка для определения емкости очереди закомментирована:

<!--<property name="queueCapacity" value="100"/>-->

Однако, если я это сделаю, то получу исключение OutOfMemoryException.Кажется, что он пытается создать неограниченную очередь прямо в начале приложения.Однако Executor-Service позволяет нам использовать неограниченные очереди - но я не знаю, как это сделать здесь.

1 Ответ

0 голосов
/ 04 июня 2018

Согласно документу java:

Неограниченная очередь блокировки, которая использует те же правила упорядочения, что и класс PriorityQueue, и предоставляет блокирующие операции поиска.Хотя эта очередь логически не ограничена, попытки добавления могут завершиться неудачей из-за исчерпания ресурсов (что приводит к OutOfMemoryError)

Таким образом, по сути, она не ограничена, и вам не нужно заботиться о размере очереди.

Однако очередь поддерживается массивом:

public PriorityBlockingQueue(int initialCapacity,
                             Comparator<? super E> comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    // Here is the initialization of backed array
    this.queue = new Object[initialCapacity];
}

Таким образом, вам необходимо предоставить разумный начальный размер очереди.После этого все готово.

Однако, как уже говорилось, по мере того, как появляется все больше и больше элементов, очередь может выдать исключение OutOfMemory при попытке увеличить внутренний массив, если внутренний массив заполнен.

private void tryGrow(Object[] array, int oldCap)

Но это очень маловероятно, если ваш проект не выдаст миллионы уведомлений за короткое время

...