Что такое групповой поток и как именно мы с ним работаем? - PullRequest
0 голосов
/ 07 июня 2019

Я работаю над потоком какого-то объекта, скажем, Flux < MovieReservation >. Он содержит информацию, такую ​​как идентификатор фильма, название, время, название и т. Д. Поэтому я хотел извлечь информацию, которая может помочь в создании нового
Flux < MovieShowDetail >. Мое намерение состояло в том, чтобы сгруппировать все резервирования по идентификатору фильма и разбить поток на множество меньших и нескольких потоков (потоков, если это вообще так). Что-то вроде

Flux {
   movie1 -> Flux<MovieShowDetail>
   movie2 -> Flux<MovieShowDetail>
   ... and so on
}

Итак, я столкнулся с этим методом groupBy, который должен делать только что-то подобное. Тем не менее, документация действительно лишена содержания, особенно о том, как перебирать каждый фильм и соответствующий поток.

Более того, когда я пытаюсь учиться методом проб и ошибок, обработка останавливается после работы над операцией перед методом groupBy.

Я пытался сделать

fluxOfSomething
.groupBy( movieReservation -> movieReservation.getMovieId ,
movieReservation -> movieReservation) 

, чтобы я мог перебирать каждый поток и создавать новый поток MovieShowDetail. Однако обработка никогда не попадает в этот блок. Я пробовал что-то записывать, но поток так и не вошел.

flux
    .map( movieSomething -> do something)
    .groupBy( movieReservation -> 
        movieReservation.getMovieId , movieReservation ->
    movieReservation)
    .subscribe("This text doesn't get printed");

Мне действительно нужно как можно больше информации об этом.

1 Ответ

0 голосов
/ 10 июня 2019

groupBy создает Flux<Flux<T>> (или, точнее, Flux<GroupedFlux<T>>, который предоставляет ключ каждой группы).

A GroupedFlux, как и Flux, должно быть подписано настать активным.Так что вам нужно как-то потреблять внутренний Flux, который производит groupBy.

Один из типичных способов сделать это - использовать flatMap, который уже принимает преобразование Function<T, Flux>.Функция может быть такой простой, как Function.identity() (но если вы хотите дополнительно обработать каждый элемент во внутреннем Flux, вам, вероятно, следует сделать это из flatMap Function (потому что ключ группы находится в области действия этой лямбды).

movieReservations
    .groupBy(MovieReservation::movieId)
    .flatMap(idFlux -> idFlux
        .collectList()
        .map(listOfReservations ->
            new MovieInformation(idFlux.key(), listOfReservations)
        )
    );
...