Redisson пытается заменить Amazon SQS - RBlockingQueueRx с фиксированным пулом потоков ExecutorService - PullRequest
0 голосов
/ 05 июля 2019

Я пытаюсь заменить Amazon SQS той же функциональностью, реализованной в очереди Redis с использованием RedissonRxClient.Я пытаюсь понять, что происходит с очередью, которая производит элементы быстрее, чем процессор может их обработать.Я использую эту функцию для подписки на очередь:

    /**
     * Retrieves and removes continues stream of elements from the head of this queue.
     * Waits for next element become available.
     * 
     * @return stream of elements
     */
    Flowable<V> takeElements();

Я наблюдаю за новым пулом потоков.В документации Executors.newFixedThreadPool говорится следующее:

* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue.  At any point, at most
* {@code nThreads} threads will be active processing tasks.
* If additional tasks are submitted when all threads are active,
* they will wait in the queue until a thread is available.

Я не могу понять, что происходит, когда в очереди больше элементов, чем может обработать мой пул потоков,В идеале я хотел бы, чтобы продюсер заблокировал, а не потянул их вниз.Я предполагаю, что мой ExecutorService будет опускать их и помещать в очередь в памяти на локальном компьютере, пока не будет доступен поток в пуле потоков.

Если мое предположение верно - как я могу предотвратить вытягивание элементаиз очереди, пока поток не станет доступным?

Если мое предположение неверно - что произойдет, если очередь содержит больше элементов, чем может обработать мой подписчик, и возможно ли предотвратить вытягивание элементов очереди из очереди?пока поток не доступен?

Я использую Java 11, Redisson 3.11.1

Вот как выглядит мой подписчик:

        composite = redisQueue.takeElements()
                .observeOn(Schedulers.from(Executors.newFixedThreadPool(threadCount)))
                .subscribe(message -> {
                    running.incrementAndGet();
                    try {
                        processMessage(message);
                    } catch (Exception e) {
                        //...
                    }
                    running.decrementAndGet();
                }, err -> log.error(err.getCause()));

У меняТрудно разработать стратегию тестирования, чтобы понять это эмпирически.

...