Создание непрерывного потока событий, отсортированных по количеству гостей за событие в потоке RSVP API Meetup - PullRequest
0 голосов
/ 28 апреля 2020

Я изучаю Spring Webflux и Reactor. Я создаю приложение, которое подключается к потоку Meetup API и предоставляет поток событий, отсортированных в порядке убывания гостей. У меня возникли проблемы с агрегацией количества гостей по каждому событию для создания непрерывного потока. Любая помощь будет высоко оценена из гостейCount

Как сохранить обновленный счет? Мне нужно поддерживать постоянное обновление отображения eventId-> guestCount вместе с другими деталями (место проведения и название события). Я не уверен, как этого добиться от groupBy. Я попытался сохранить внешнюю карту и вернуть отсортированные значения из этой карты в результате. Но это не работает, так как карта пуста с самого начала и, следовательно, пустой поток возвращается немедленно

Class RSVP
{
Event event;
Integer guestCount;
Venue venue;
}

/*Logic to process request*/
Map<String,ProcessedResponse> mapper= new HashMap<String,ProcessedResponse>();

Function<ResponseBean,ProcessedResponse> guestsTracker = a -> {
            int newGuestCount = a.getGuests()+1;
            String eventId = a.getEvent().getEventId();
            if(tracker.containsKey(eventId)) {
                ProcessedResponsestate = tracker.get(eventId);
                state.setGuests(state.getGuests()+newGuestCount);
            }else {
                tracker.put(eventId, new ProcessedResponse(a.getVenue(), eventId, a.getEvent().getEventName(), newGuestCount));
            }
            return tracker.get(eventId);
        };

Flux<ProcessedResponse> processedRsvpFlux = streamwebClient.get().uri(meetup_rsvp_stream_uriStr).exchange()
                .flatMapMany(clientResponse -> clientResponse.bodyToFlux(ResponseBean.class)).filter(predicate)
                .map(guestsTracker)));

Flux<ProcessedResponse> result= Flux.fromIterable(tracker.values()).sort(sorter);
return result;

Попытка с функцией groupBy Flux: я понимаю, что мне нужно объединить входящий поток так, чтобы для каждого Получено событие Eventid, если оно уже получено, обновите значение guestCount Применить сортировку к полученному GroupedFlux для сортировки в порядке убывания guestCount

Я пытался использовать вариант groupBy (keyMapper, valueMapper). При этом я знаю, что keyMapper может отображаться в eventId как ключ. Я не понимаю, как я могу реализовать valueMapper, так как если это rsvp для того же события происходит снова, мне нужно обновить счетчик для события и не создавать новую запись.

...