Группировать по свойству объекта в поток Java - PullRequest
0 голосов
/ 20 сентября 2018

Учитывая следующую структуру данных Data и Flux<Data>, что является идиоматическим способом достижения группировки в серии списков на основе некоторого свойства:

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;

class Scratch {
    private static class Data {
        private Integer key;
        private String value;

        public Data(Integer key, String value) {
            this.key = key;
            this.value = value;
        }

        public Integer getKey() {
            return key;
        }

        public String getValue() {
            return value;
        }

        public static Data of(Integer key, String value) {
            return new Data(key, value);
        }

        @Override
        public String toString() {
            return value;
        }
    }

    public static void main(String[] args) {
        Flux<Data> test = Flux.just(
                Data.of(1, "Hello"),
                Data.of(1, "world"),
                Data.of(2, "How"),
                Data.of(2, "are"),
                Data.of(2, "you"),
                Data.of(3, "Bye"));
        test.bufferUntil(new Predicate<Data>() {
            Integer prev = null;
            @Override
            public boolean test(Data next) {
                boolean collect = prev != null && !Objects.equals(prev, next.getKey());
                prev = next.getKey();
                return collect;
            }
        }, true).subscribe(e -> System.out.println(e.toString()));
    }
} 

Вывод:

[Hello, world]
[How, are, you]
[Bye]

Я знаю о функции groupBy на Flux, но это снова дает мне Flux, а не список.Текущее решение, которое я описал выше, работает, но оно не кажется на 100% идиоматическим, потому что мне пришлось использовать анонимный класс вместо лямбды.Я мог бы использовать лямбду и AtomicReference снаружи от лямбды, но это тоже не на 100% правильно.Есть предложения?

1 Ответ

0 голосов
/ 02 ноября 2018

Вот решение с использованием оператора groupBy.Я сгруппировал данные по общему ключу.Оператор groupBy дает мне поток GroupedFlux.GroupedFlux является подклассом Flux, поэтому я применяю flatMap и преобразую отдельный groupedFlux в List<Data> с помощью оператора collectList.Таким образом, я получаю Flux<List<Data>>, на который затем подписываюсь и распечатываю по вашему запросу.

test.groupBy(Data::getKey)
                .flatMap(Flux::collectList)
                .subscribe(listOfStringsHavingDataWithSameKey -> System.out.println(listOfStringsHavingDataWithSameKey.toString()));

Извлекайте документы для Flux и GroupedFlux .

...