Я изучаю Spring Webflux и Reactor. Я создаю приложение, которое подключается к потоку Meetup API и предоставляет поток событий, отсортированных в порядке убывания гостей. У меня возникли проблемы с агрегацией количества гостей по каждому событию для создания непрерывного потока. Любая помощь будет высоко оценена из гостейCount
Как сохранить обновленный счет? Мне нужно поддерживать постоянное обновление отображения eventId-> guestCount вместе с другими деталями (место проведения и название события). Я не уверен, как этого добиться от groupBy. Я попытался сохранить внешнюю карту и вернуть отсортированные значения из этой карты в результате. Но это не работает, так как карта пуста с самого начала и, следовательно, пустой поток возвращается немедленно
Class RSVP
{
Event event;
Integer guestCount;
Venue venue;
}
/*Logic to process request*/
Map<String,ProcessedResponse> mapper= new HashMap<String,ProcessedResponse>();
Function<ResponseBean,ProcessedResponse> guestsTracker = a -> {
int newGuestCount = a.getGuests()+1;
String eventId = a.getEvent().getEventId();
if(tracker.containsKey(eventId)) {
ProcessedResponsestate = tracker.get(eventId);
state.setGuests(state.getGuests()+newGuestCount);
}else {
tracker.put(eventId, new ProcessedResponse(a.getVenue(), eventId, a.getEvent().getEventName(), newGuestCount));
}
return tracker.get(eventId);
};
Flux<ProcessedResponse> processedRsvpFlux = streamwebClient.get().uri(meetup_rsvp_stream_uriStr).exchange()
.flatMapMany(clientResponse -> clientResponse.bodyToFlux(ResponseBean.class)).filter(predicate)
.map(guestsTracker)));
Flux<ProcessedResponse> result= Flux.fromIterable(tracker.values()).sort(sorter);
return result;
Попытка с функцией groupBy Flux: я понимаю, что мне нужно объединить входящий поток так, чтобы для каждого Получено событие Eventid, если оно уже получено, обновите значение guestCount Применить сортировку к полученному GroupedFlux для сортировки в порядке убывания guestCount
Я пытался использовать вариант groupBy (keyMapper, valueMapper). При этом я знаю, что keyMapper может отображаться в eventId как ключ. Я не понимаю, как я могу реализовать valueMapper, так как если это rsvp для того же события происходит снова, мне нужно обновить счетчик для события и не создавать новую запись.