Я пытаюсь заменить Amazon SQS той же функциональностью, реализованной в очереди Redis с использованием RedissonRxClient.Я пытаюсь понять, что происходит с очередью, которая производит элементы быстрее, чем процессор может их обработать.Я использую эту функцию для подписки на очередь:
/**
* Retrieves and removes continues stream of elements from the head of this queue.
* Waits for next element become available.
*
* @return stream of elements
*/
Flowable<V> takeElements();
Я наблюдаю за новым пулом потоков.В документации Executors.newFixedThreadPool
говорится следующее:
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue. At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.
Я не могу понять, что происходит, когда в очереди больше элементов, чем может обработать мой пул потоков,В идеале я хотел бы, чтобы продюсер заблокировал, а не потянул их вниз.Я предполагаю, что мой ExecutorService будет опускать их и помещать в очередь в памяти на локальном компьютере, пока не будет доступен поток в пуле потоков.
Если мое предположение верно - как я могу предотвратить вытягивание элементаиз очереди, пока поток не станет доступным?
Если мое предположение неверно - что произойдет, если очередь содержит больше элементов, чем может обработать мой подписчик, и возможно ли предотвратить вытягивание элементов очереди из очереди?пока поток не доступен?
Я использую Java 11, Redisson 3.11.1
Вот как выглядит мой подписчик:
composite = redisQueue.takeElements()
.observeOn(Schedulers.from(Executors.newFixedThreadPool(threadCount)))
.subscribe(message -> {
running.incrementAndGet();
try {
processMessage(message);
} catch (Exception e) {
//...
}
running.decrementAndGet();
}, err -> log.error(err.getCause()));
У меняТрудно разработать стратегию тестирования, чтобы понять это эмпирически.