Executor Thread Pool - ограничить размер очереди и удалить самую старую очередь - PullRequest
0 голосов
/ 10 ноября 2018

Я использую фиксированный пул потоков для получателя сообщений в приложении весенней загрузки. Мой продюсер производит (намного) быстрее, чем продюсер может обработать сообщение, поэтому очередь из пула потоков кажется "затопляющей".

Как лучше всего ограничить размер очереди? Предполагаемое поведение очереди будет «если очередь заполнена, удалите заголовок и вставьте новый Runnable». Можно ли настроить пул потоков Executors следующим образом?

Ответы [ 2 ]

0 голосов
/ 10 ноября 2018

Это создаст пул потоков для вас с размером, который вы передаете.

ExecutorService service = Executors.newFixedThreadPool(THREAD_SIZE);

Это внутренне создает экземпляр ThreadPoolExecutor, который реализует ExecutorService.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

Чтобы создать собственный пул thead, вы можете просто сделать.

ExecutorService service =   new ThreadPoolExecutor(5, 5,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(10));

Здесь мы можем указать размер очереди, используя перегруженный конструктор LinkedBlockingQueue.

    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

Надеюсь, это поможет.Ура !!!

0 голосов
/ 10 ноября 2018

ThreadPoolExecutor поддерживает эту функцию через ThreadPoolExecutor.DiscardOldestPolicy:

Обработчик отклоненных задач, который отбрасывает самый старый необработанный запрос и затем повторяет выполнение, если исполнитель не закрыт.down, и в этом случае задача отменяется.

Вам необходимо создать пул с этой политикой вручную, например:

int poolSize = ...;
int queueSize = ...;
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();

ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(queueSize),
    handler);
...