Приоритет очереди реализации Spring TaskExecutor - PullRequest
0 голосов
/ 28 мая 2018

Приложение, над которым я работаю, получает уведомления от внешних систем, которые я хочу обработать.

До сих пор у меня есть следующая реализация:

    public class AsynchronousServiceImpl implements AsynchronousService {

    private TaskExecutor taskExecutor;

    @Override
    public void executeAsynchronously(Runnable task) {
        taskExecutor.execute(task);
    }

    @Required
    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }
}

конфигурация пружины (мне нужен только 1 поток, так как я не хочу выполнять уведомления параллельно из-за некоторых унаследованных проблем, которые сложныизменить)

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    <property name="corePoolSize" value="1"/>
    <property name="maxPoolSize" value="1"/>
    <property name="WaitForTasksToCompleteOnShutdown" value="true"/>
</bean>

Здесь я выполняю затем код:

 asynchronousService.executeAsynchronously(new Runnable() {
     @Override
     public void run() {
         someMethod.processNotification(notification)
      }
   });

Этот объект уведомления, который у меня есть, содержит поле метки времени.Я хочу расставить приоритеты для уведомлений в очереди этим полем (я думаю, что Spring по умолчанию использует неограниченную очередь, что мне больше подходит, поскольку мне нужна неограниченная очередь)

Могу ли я каким-то образом интегрировать это в мое весеннее приложениене осуществляя это вручную с нуля?Поэтому я хочу отсортировать taskss (runnable-objects) в очереди на основе поля метки времени объекта уведомления (именно этот объект я передаю методу processNotification)

1 Ответ

0 голосов
/ 28 мая 2018

ThreadPoolTaskExecutor поддерживается BlockingQueue:

protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
    if (queueCapacity > 0) {
        return new LinkedBlockingQueue<Runnable>(queueCapacity);
    }
    else {
        return new SynchronousQueue<Runnable>();
    }
}

Если вы хотите заказать задачу, вам нужно переопределить эту функцию, чтобы разрешить упорядочение по приоритетам:

public class YourPool extends ThreadPoolTaskExecutor {
    @Override
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
        return new PriorityBlockingQueue<>(queueCapacity);
    }
}

Отправленное вами задание должно быть сопоставимо:

public class YourTask implements Runnable, Comparable<YourTask> {
    private Notification notification;

    public YourTask(Notification notification) {
        this.notification = notification;
    }
    @Override
    public void run() {
        someMethod.processNotification(notification)
    }

    @Override
    public int compareTo(B other) {
        // Here you implement the priority
        return notification.getTimestamp().compareTo(other.notification.getTimestamp());
    }
}

Затем отправьте задание:

asynchronousService.executeAsynchronously(new YourTask(notificationX));
...