Сворачивание последовательного потока в Java - PullRequest
0 голосов
/ 28 января 2019

Я привык к программированию в Scala, но мне нужно написать немного Java, и я пытаюсь выполнить эквивалент следующего фрагмента Scala:

trait Options[K, V] {
  def add(key: K , value: V): Options[K, V]
}

val options: Options[T, U] = ???
val elems: List[(T, U)] = ???
elems.foldLeft(options) {
  case (opts, (key, value)) => opts.add(key, value)
}

То есть ясворачивание элементов в elems внутри options, создание нового экземпляра на каждом шаге.

Я пытался использовать Java Stream#reduce:

interface Options<K, V> {
  Options<K, V> add(K key, V value);
}

Options<K, V> options = ???
Stream<Tuple2<K, V>> elems = ??? // This is Reactor's Tuple2
elems.reduce(options, (opts, opt) -> opts.add(opt), ???)

Я не знаю, каким должен быть комбинатор, и мне сложно представить, какие значения будут иметь его аргументы.Насколько я понимаю, combiner будет использоваться для объединения промежуточных значений, полученных параллельно в параллельном потоке.Меня не волнует обработка elems параллельно в моем случае.Другими словами, я ищу синхронную и последовательную версию Flux#reduce.

У меня нет контроля над API Options.elems не обязательно должен быть Stream.

1 Ответ

0 голосов
/ 28 января 2019

Невозможно написать сумматор с предоставленным вами интерфейсом.Проблема в том, что объединителю нужен способ объединения двух Options, но этого не существует.Единственное, что можно сделать с экземпляром Options, это добавить к нему одну пару.Я не могу получить какую-либо информацию из этого.Предположительно, он не может сделать что-то очень полезное.

Возможно, эта проблема связана с тем, что у Java нет признаков, и интерфейсы Java не являются подходящей заменой признаков.

Идиоматический способ Javaчтобы написать это просто стандартный болван для цикла:

Options<String, String> options = /*whatever*/;
List<Pair<String, String>> elems = /*whatever*/;
for (Pair<String, String> pair : elems)
{
    options = options.add(pair.getKey(), pair.getValue());
}

Если вы можете справиться с тем фактом, что вам никогда не удавалось использовать параллельный поток, вы можете воспользоваться тем, чтопоследовательный поток никогда не будет использовать объединитель.Таким образом, вы можете написать Collector, который определяет комбинатор, который просто сгенерирует исключение.

Options<String, String> foo = elems.stream()
    .collect(
        () -> options,
        (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
        (a, b) -> { throw new UnsupportedOperationException(); }
    );

Если вы действительно хотите использовать reduce, вам нужно изменить свой интерфейс либораскрыть некоторую информацию о парах ключ-значение, которые он содержит, или предоставить средство для добавления более чем одной пары ключ-значение одновременно.Например:

interface Options<K, V>
{
    Options<K, V> add(K key, V value);
    Options<K, V> add(Options<K, V> otherOptions);
}

Options<String, String> options = /*whatever*/;
List<Pair<String, String>> elems = /*whatever*/;

Options<String, String> foo = elems.stream()
    .reduce(
        options,
        (opt, pair) -> opt.add(pair.getKey(), pair.getValue()),
        Options::add
    );

Я сомневаюсь, что это то, что вы хотели услышать, но Scala и Java - это разные языки.Вы не должны ожидать, что все будет иметь точную параллель.Если бы это было так, то не было бы оснований для существования обоих языков.

...