У меня простая проблема: я получаю кучу строк из базы данных SQL. Мне нужно свернуть эти строки, уменьшить их до нескольких json объектов. Используя rx Java, у меня есть:
Promise<Flowable<Data>> p = Promise.promise();
p.complete(observable
.groupBy(array -> array.getValue(0))
.flatMap(g -> extractData(g));
Дело в том, что когда данных больше определенного количества, этот код зависает. думаю Я понимаю, почему, из-за ограничения параллелизма flatMap
и потому, что мои группы никогда не заканчивают свою работу и ждут завершения основного наблюдаемого, прежде чем вернуться (для уменьшения необходимо, чтобы все значения в группе работали). Дело в том, что строки, которые я обрабатываю, гарантированно упорядочиваются с помощью определенного ключа c, такого же, что и тот, который я группирую.
Я ищу способ закрыть предыдущую группу, когда новая группа создана. Есть ли способ добиться этого?
Я думал, что groupByUntil
позволит это, но похоже, что теперь он объединен в метод groupBy
(по крайней мере, в Rx Java), и я не могу справиться найти способ, используя takeUntil
/ takeWhile
EDIT:
Просто осознавая, что без содержимого extractData
это довольно сложно понять, вот оно:
return group.reduce(new Data(), <business logic>).toFlowable()