В случае, когда вы модифицируете один объект в нескольких конвейерах, учтите следующее:
// this example is in Kotlin but should be fairly analagous to java
fun main() {
var count = 0
Flowable.range(0, 1000)
// splits the stream into "parallel pipelines"
.parallel()
.runOn(Schedulers.computation())
.doAfterNext { count += 1 }
// merge back into a single pipeline
.sequential()
.subscribe()
println(count)
}
Результаты этого будут разными, но на моем компьютере он печатает что-либо из 600
- 850
В качестве примера два способа исправить это, используя AtomicInteger
вместо используемого Int
.Еще лучше было бы не хранить какую-либо переменную count и просто выполнить сложение для каждого параллельного процесса, а затем объединить результаты после того, как мы вернемся к одному потоку, с sequential
, но я не уверен, что это применимо к вашему вопросу.
Напомним, что если вы изменяете любое количество объектов, но только в одном конвейере за раз, вам не нужно беспокоиться о проблемах с многопоточностью, так как изменения ограничены одним потоком.Если он разделен на несколько конвейеров одновременно , вам придется беспокоиться об этом так же, как обычно с любым многопоточным приложением.Решение может включать использование блокировок или синхронизированных объектов, доступных в Java, или может быть более реактивный способ избежать этих проблем (например, обработка только в одном конвейере за раз), однако без конкретного примера трудно сказать *.1015 *