Как безопасно использовать потоки Java безопасно без методов isFinite () и isOrdered ()? - PullRequest
18 голосов
/ 10 июня 2019

Возникает вопрос о том, должны ли java-методы возвращать Коллекции или Потоки , в которых Брайан Гетц отвечает, что даже для конечных последовательностей потоки обычно предпочтительнее.

Но мне кажется, что в настоящее время многие операции над потоками, которые приходят из других мест, не могут быть безопасно выполнены, и защитные средства защиты кода невозможны, потому что потоки не показывают, являются ли они бесконечными или неупорядоченными.

Если параллелизм был проблемой для операций, которые я хочу выполнить в Stream (), я могу вызвать isParallel () для проверки или последовательно, чтобы убедиться, что вычисления выполняются параллельно (если я не забуду).

Но если упорядоченность или ограниченность (размерность) были важны для безопасности моей программы, я не могу написать гарантии.

Предполагается, что я использую библиотеку, реализующую этот вымышленный интерфейс:

public interface CoordinateServer {
    public Stream<Integer> coordinates();
    // example implementations:
    // IntStream.range(0, 100).boxed()   // finite, ordered, sequential
    // final AtomicInteger atomic = new AtomicInteger();
    // Stream.generate(() -> atomic2.incrementAndGet()) // infinite, unordered, sequential
    // Stream.generate(() -> atomic2.incrementAndGet()).parallel() // infinite, unordered, parallel
}

Тогда какие операции я могу безопасно вызвать в этом потоке, чтобы написать правильный алгоритм?

Кажется, если я, возможно, захочу записать элементы в файл как побочный эффект, мне нужно беспокоиться о параллельности потока:

// if stream is parallel, which order will be written to file?
coordinates().peek(i -> {writeToFile(i)}).count();
// how should I remember to always add sequential() in  such cases?

А также, если он параллельный, на основе какого Threadpool он параллелен?

Если я хочу отсортировать поток (или другие операции без короткого замыкания), мне нужно как-то быть осторожным, так как он бесконечен:

coordinates().sorted().limit(1000).collect(toList()); // will this terminate?
coordinates().allMatch(x -> x > 0); // will this terminate?

Я могу наложить ограничение перед сортировкой, но какое магическое число должно быть, если я ожидаю конечный поток неизвестного размера?

Наконец, возможно, я хочу вычислить параллельно, чтобы сэкономить время, а затем собрать результат:

// will result list maintain the same order as sequential?
coordinates().map(i -> complexLookup(i)).parallel().collect(toList());

Но если поток не упорядочен (в этой версии библиотеки), результат может стать искаженным из-за параллельной обработки. Но как я могу защититься от этого, кроме как не использовать параллель (что отрицательно сказывается на производительности)?

Коллекции явно указывают на то, что они конечны или бесконечны, имеют ли они порядок или нет, и они не несут с собой режим обработки или пулы потоков. Это похоже на ценные свойства для API.

Кроме того, Потоки иногда могут быть закрыты , но чаще всего нет. Если я использую поток из метода (или из параметра метода), я должен обычно вызывать close?

Кроме того, потоки, возможно, уже были использованы, и было бы хорошо иметь возможность изящно обработать этот случай, поэтому было бы хорошо проверить, не был ли уже использован поток ;

Я хотел бы получить фрагмент кода, который можно использовать для проверки предположений о потоке перед его обработкой, например>

Stream<X> stream = fooLibrary.getStream();
Stream<X> safeStream = StreamPreconditions(
    stream, 
    /*maxThreshold or elements before IllegalArgumentException*/
    10_000,
    /* fail with IllegalArgumentException if not ordered */
    true
    )

1 Ответ

2 голосов
/ 26 июня 2019

Посмотрев немного (некоторые эксперименты и здесь ), насколько я вижу, невозможно определить, является ли поток конечным или нет.

Более того, иногда даже это не определяется, кроме как во время выполнения (например, в Java 11 - IntStream.generate(() -> 1).takeWhile(x -> externalCondition(x))).

Что вы можете сделать:

  1. Вы можете с уверенностью узнать, является ли оно конечным, несколькими способами (обратите внимание, что получение ложного на них не означает, что оно бесконечно, только то, что это может быть так):

    1. stream.spliterator().getExactSizeIfKnown() - если он имеет известный точный размер, он конечен, в противном случае он вернет -1.

    2. stream.spliterator().hasCharacteristics(Spliterator.SIZED) - если это SIZED, вернет true.

  2. Вы можете защитить себя, предполагая худшее (зависит от вашего случая).

    1. stream.sequential()/stream.parallel() - явно указать предпочитаемый тип потребления.
    2. С потенциально бесконечным потоком, предположите ваш худший случай в каждом сценарии.

      1. Например, предположим, что вы хотите прослушать поток твитов, пока не найдете один из них Venkat - это потенциально бесконечная операция, но вы хотели бы подождать, пока такой твит не будет найден. Поэтому в этом случае просто перейдите на stream.filter(tweet -> isByVenkat(tweet)).findAny() - он будет повторяться до тех пор, пока такой твит не появится (или навсегда).
      2. Другой сценарий, и, возможно, более распространенный сценарий, заключается в желании сделать что-то для всех элементов или только попробовать определенное количество времени (аналогично тайм-ауту). Для этого я рекомендую всегда вызывать stream.limit(x) перед вызовом вашей операции (collect или allMatch или аналогичный), где x - количество попыток, которые вы готовы терпеть.

После всего этого я просто упомяну, что я думаю, что возвращение потока, как правило, не очень хорошая идея, и я постараюсь избежать этого, если не будет больших преимуществ.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...