Вы можете поместить calculateSum
в поток, чтобы он выглядел лучше.
replayResponseFlowable.toMultiMap(response -> response.getValues())
.map(groups -> calculateSum(groups))
.subscribe(result -> {
print(result);
})
Поскольку вы только агрегируете счетчики, вам не нужноотслеживать все предметы.Вместо этого вы можете оставить только сумму отсчета для каждого значения.Это можно сделать с помощью оператора .collect
:
replayResponseFlowable
.collectInto(new HashMap<String, Integer>(), (group, response) -> {
group.merge(response.values, response.count, Integer::sum);
})
.subscribe(result -> {
print(result);
});
Это выведет
emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=5}]
Если вы хотите получить результат в виде текущегоиспускает предметы, используйте .scan
:
replayResponseFlowable
.scan(new HashMap<String, Integer>(), (group, response) -> {
group.merge(response.values, response.count, Integer::sum);
return group;
})
.subscribe(result -> {
print(result);
});
Это напечатает:
emitted=[]
emitted=[{value=A, count=2}]
emitted=[{value=A, count=2}, {value=B, count=3}]
emitted=[{value=A, count=2}, {value=B, count=3}, {value=C, count=4}]
emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=4}]
emitted=[{value=A, count=7}, {value=B, count=3}, {value=C, count=5}]