Реализация пользовательских промежуточных операций на Java 8 Streams - PullRequest
8 голосов
/ 14 июня 2019

Я пытаюсь понять, как реализовать пользовательскую промежуточную операцию в потоке Java 8.И кажется, что я заблокирован: (

В частности, я хочу взять поток и вернуть каждую запись до , включая первую, которая имеет определенное значение. И я хочучтобы прекратить генерировать что-либо после этого - сделать его коротким замыканием.

Он запускает серию проверочных проверок входных данных. Я хочу остановиться на первой ошибке, если она есть, но я хочу сопоставить предупрежденияи потому что эти проверки могут быть дорогостоящими - например, при поиске в базе данных - я хочу запустить только необходимый минимальный набор.

Таким образом, код будет выглядеть примерно так:

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .takeUntil(result -> result.isError()) // This is the bit I can't do
    .reduce(new ValidationResult(), ::mergeResults);

кажется , что я должен быть в состоянии что-то сделать с ReferencePipeline.StatefulOp, за исключением того, что это все область действия пакета, и поэтому я не могу его расширить. И поэтому мне интересно, какой правильный способдобиться этого? Или, если это вообще возможно?

Также обратите внимание - это должно быть в Java 8, а не в 9+, поскольку мы еще не пришли по различным не связанным с этим причинам.

Приветствия

Ответы [ 4 ]

1 голос
/ 14 июня 2019

Я признаю, что код мудрый, ответ Хольгера намного более сексуален, но может быть, это как-то легче читать:

public static <T> Stream<T> takeUntilIncluding(Stream<T> s, Predicate<? super T> condition) {

    class Box implements Consumer<T> {

        boolean stop = false;

        T t;

        @Override
        public void accept(T t) {
            this.t = t;
        }
    }

    Box box = new Box();

    Spliterator<T> original = s.spliterator();

    return StreamSupport.stream(new AbstractSpliterator<>(
        original.estimateSize(),
        original.characteristics() & ~(Spliterator.SIZED | Spliterator.SUBSIZED)) {

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {

            if (!box.stop && original.tryAdvance(box) && condition.test(box.t)) {
                action.accept(box.t);
                return true;
            }

            box.stop = true;

            return false;
        }
    }, s.isParallel());

}
1 голос
/ 14 июня 2019

Как правило, пользовательские операции должны иметь дело с интерфейсом Spliterator. Он расширяет концепцию Iterator, добавляя характеристики и информацию о размере, а также возможность разделять часть элементов в качестве другого сплитератора (отсюда и его название). Это также упрощает итерационную логику, поскольку требуется только один метод.

public static <T> Stream<T> takeWhile(Stream<T> s, Predicate<? super T> condition) {
    boolean parallel = s.isParallel();
    Spliterator<T> spliterator = s.spliterator();
    return StreamSupport.stream(new Spliterators.AbstractSpliterator<T>(
        spliterator.estimateSize(),
        spliterator.characteristics()&~(Spliterator.SIZED|Spliterator.SUBSIZED)) {
            boolean active = true;
            Consumer<? super T> current;
            Consumer<T> adapter = t -> {
                if((active = condition.test(t))) current.accept(t);
            };

            @Override
            public boolean tryAdvance(Consumer<? super T> action) {
                if(!active) return false;
                current = action;
                try {
                    return spliterator.tryAdvance(adapter) && active;
                }
                finally {
                    current = null;
                }
            }
        }, parallel).onClose(s::close);
}

Чтобы сохранить свойства потока, мы сначала запрашиваем параллельное состояние, чтобы восстановить его для нового потока. Кроме того, мы регистрируем действие закрытия, которое закроет исходный поток.

Основная работа заключается в реализации Spliterator, декорирующего разделитель предыдущего состояния потока.

Характеристики сохраняются, за исключением SIZED и SUBSIZED, так как наша операция приводит к непредсказуемому размеру. Оригинальный размер все еще передается, теперь он будет использоваться в качестве оценки.

Это решение сохраняет Consumer, переданное tryAdvance на время операции, чтобы иметь возможность использовать одного и того же потребителя адаптера, избегая создания нового для каждой итерации. Это работает, поскольку гарантируется, что tryAdvance никогда не вызывается одновременно.

Параллелизм осуществляется через разбиение, которое наследуется от AbstractSpliterator. Эта унаследованная реализация будет буферизовать некоторые элементы, что является разумным, поскольку реализация лучшей стратегии для такой операции, как takeWhile, действительно сложна.

Так что вы можете использовать его как

    takeWhile(Stream.of("foo", "bar", "baz", "hello", "world"), s -> s.length() == 3)
        .forEach(System.out::println);

который напечатает

foo
bar
baz

или

takeWhile(Stream.of("foo", "bar", "baz", "hello", "world")
    .peek(s -> System.out.println("before takeWhile: "+s)), s -> s.length() == 3)
    .peek(s -> System.out.println("after takeWhile: "+s))
    .forEach(System.out::println);

который напечатает

before takeWhile: foo
after takeWhile: foo
foo
before takeWhile: bar
after takeWhile: bar
bar
before takeWhile: baz
after takeWhile: baz
baz
before takeWhile: hello

, который показывает, что он не обрабатывает больше, чем необходимо. Перед этапом takeWhile мы должны встретиться с первым несовпадающим элементом, после этого мы встречаемся только с элементами до этого.

0 голосов
/ 14 июня 2019

Вы можете использовать следующую структуру;

AtomicBoolean gateKeeper = new AtomicBoolean(true);    
Optional<Foo> result = validators.stream()
    .filter(validator -> gateKeeper.get() 
                && gateKeeper.compareAndSet(true, !validator.validate(data).isError()) 
                && gateKeeper.get())
    .reduce(...) //have the first n non-error validators here

Фильтр с gateKeeper действует как логика короткого замыкания и продолжает работать, пока не встретит первый случай isError() == true, не отклонит его и не закроет двери для других вызовов validate() с этого момента. Это выглядит немного сумасшедшим, но это намного проще, чем другие пользовательские реализации и может отлично работать, если это соответствует вашим требованиям.

Не уверен на 100%, если это полезно, так как я игнорирую результат validator.validate(data), кроме результата isError(), и тот факт, что он принадлежит тому, что validator в списке.

0 голосов
/ 14 июня 2019

Вы можете сделать это с помощью трюка:

List<ValidationResult> res = new ArrayList<>(); // Can modify it with your `mergeResults` instead of list

Optional<ValidationResult> result = validators.stream()
    .map(validator -> validator.validate(data))
    .map(v -> {
       res.add(v);
       return v;
    })
    .filter(result -> result.isError())
    .findFirst();

List<ValidationResult> res будет содержать ваши заинтересованные значения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...