Поставщик потока получает ошибку «поток уже обработан или закрыт» - PullRequest
1 голос
/ 05 ноября 2019

Несмотря на то, что я использую поставщика для своих потоков и использую Supplier.Get () каждый раз, когда я хочу получить свой strem и выполнить на нем терминальную операцию, я все еще получаю «поток уже был обработан или закрыт»исключение. Кто-нибудь может, пожалуйста, взглянуть на мой код и предложить, что я делаю неправильно?

Метод, где выдается исключение:

private static void printMyDetails(Supplier<Stream<Result>> mySupplier, String myStatus) {
        checkNotNull(mySupplier);
        checkArgument(isNotEmpty(myStatus), "Invalid My status");

        if (mySupplier.get().noneMatch(result -> true)) { //<-This is where the exception is thrown
            if (log.isInfoEnabled()) {
                log.info("Number of My with status '{}': {}", My, 0);
            }
        } else {
            log.info("Number of My with status '{}': {}", myStatus, mySupplier.get().count());
            log.info("Details of My(s) with status: '{}'", myStatus);
            mySupplier.get().sorted().forEach(Utils::printMyNameAndDetails);
        }
    }

Место, которое вызывает вышеуказанный метод:

rb.keySet().stream().filter(key -> containsIgnoreCase(key, "status")).map(rb::getString)
                .filter(StringUtils::isNotEmpty).forEach(status -> {
            var resultsWithExpectedStatusSupplier = requireNonNull(getResultsWithExpectedStatusSupplier(results, status));
            resultsWithExpectedStatusSupplier.ifPresentOrElse(streamSupplier -> printMyDetails(streamSupplier, status), () -> {
                if (log.isInfoEnabled())
                    log.info("0 My with status: {}", status);
            });
        });

Поставщик потока:

private static Optional<Supplier<Stream<Result>>> getResultsWithExpectedStatusSupplier(
            @NotNull List<Result> results, @NotNull String expectedStatus) {
        checkArgument(!results.isEmpty(), "Results list is empty");
        checkArgument(isNotEmpty(expectedStatus), "Invalid expected status");

        var resultStreamWithExpectedStatus = requireNonNull(results.stream().filter(result -> ofNullable(result).map(Result::getStatus)
                .allMatch(s -> isNotEmpty(s) && s.equalsIgnoreCase(expectedStatus))));
        return resultStreamWithExpectedStatus.count() == 0 ? Optional.empty() : Optional.of(() -> resultStreamWithExpectedStatus);
    }

Ответы [ 2 ]

2 голосов
/ 05 ноября 2019

Вы можете использовать поток только один раз. Похоже, что Поставщик всегда дает один и тот же поток снова и снова. После первой операции терминала поток сливается;Поток от Поставщика должен всегда быть новым Потоком.

1 голос
/ 05 ноября 2019

Общая проблема заключается в том, как сказал Кристиан Улленбум: поток уже израсходован. Точное местоположение в вашем коде - это вызов resultStreamWithExpectedStatus.count() в методе getResultsWithExpectedStatusSupplier, так как Stream.count - операция сокращения / терминала, которая потребляет поток.

Как указано, например, в этом ответе , вы не можете получить размер потоков без его использования. Исправьте это, сохранив отфильтрованные элементы в коллекции (например, Collectors.toList), запросив там размер и вернув саму коллекцию, а не поток?

Что касается примечания, я думаю, вы неправильно используете Optional aслишком многоКод может быть проще, пропуская пустые потоки (или даже лучше: пропустить пустую, отфильтрованную коллекцию).

...