Асинхронный эквивалент BlockingQueue в Java? - PullRequest
2 голосов
/ 17 октября 2019

Я ищу очередь, которая была бы асинхронным (неблокирующим) эквивалентом java.util.concurrent.BlockingQueue. Его интерфейс будет включать:

public interface AsynchronousBlockingQueue<E> {
    // - if the queue is empty, return a new CompletableFuture,
    //   that will be completed next time `add` is called
    // - if the queue is not empty, return a completed CompletableFuture,
         containing the first element of the list
    public CompletableFuture<E> poll();

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element);
}

Если это имеет значение, должен быть только один поток опроса, и опрос должен выполняться последовательно (poll не будет вызываться, когда опрос уже выполняется).

Я ожидал, что это уже существует в JVM, но не смог найти его, и, конечно, я бы предпочел использовать что-то из JVM, чем писать сам.

Еще одно ограничение, я 'Я застрял с Java 8 (хотя мне определенно интересно узнать, что существует в более поздних версиях).

1 Ответ

0 голосов
/ 21 октября 2019

Итак, наконец, я написал свой собственный класс ... Интересуют комментарии:)

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;

public class AsynchronousBlockingQueue<E> {
    CompletableFuture<E> incompletePolling = null;
    Queue<E> elementsQueue = new LinkedList<>();

    // if the queue is empty, return a new CompletableFuture, that will be completed next time `add` is called
    // if the queue is not empty, return a completed CompletableFuture containing the first element of the list
    public synchronized CompletableFuture<E> poll() {
        // polling must be done sequentially, so this shouldn't be called if there is a poll ongoing.
        if (incompletePolling != null)
            throw new IllegalStateException("Polling is already ongoing");
        if (elementsQueue.isEmpty()) {
            incompletePolling = new CompletableFuture<>();
            return incompletePolling;
        }
        CompletableFuture<E> result = new CompletableFuture<>();
        result.complete(elementsQueue.poll());
        return result;
    }

    // if polling is in progress, complete the ongoing polling CompletableFuture.
    // otherwise, add the element to the queue
    public synchronized void add(E element) {
        if (incompletePolling != null) {
            CompletableFuture<E> result = incompletePolling;
            // removing must be done first because the completion could trigger code that needs the queue state to be valid
            incompletePolling = null;
            result.complete(element);
            return;
        }
        elementsQueue.add(element);
    }


}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...