Если вы передаете объект с состоянием между реактивными лямбда-конструкциями, вам нужно заботиться о видимости потока? - PullRequest
1 голос
/ 06 июня 2019

Скажем, мы используем RxJava или Reactor, у нас есть объект, который содержит пару свойств, которые могут быть изменены с помощью сеттеров. Скажем, некоторые операции мы выполняем в разных пулах потоков. Мы используем этот объект состояния в нашем конвейере несколько раз и меняем его свойства. Нужно ли нам использовать volatile, чтобы гарантировать, что кэшированная версия не возвращается? Или существует механизм с последовательной природой конвейера, который гарантирует, что модифицированный объект, например, возвращается через .flatMap? Нужно ли, чтобы этот объект был неизменным и воссоздавался каждый раз при модификации?

Ответы [ 2 ]

2 голосов
/ 11 июня 2019

В случае, когда вы модифицируете один объект в нескольких конвейерах, учтите следующее:

// this example is in Kotlin but should be fairly analagous to java
fun main() {
    var count = 0

    Flowable.range(0, 1000)
        // splits the stream into "parallel pipelines"
        .parallel()
        .runOn(Schedulers.computation())
        .doAfterNext { count += 1 }
        // merge back into a single pipeline
        .sequential()
        .subscribe()

    println(count)
}

Результаты этого будут разными, но на моем компьютере он печатает что-либо из 600 - 850

В качестве примера два способа исправить это, используя AtomicInteger вместо используемого Int.Еще лучше было бы не хранить какую-либо переменную count и просто выполнить сложение для каждого параллельного процесса, а затем объединить результаты после того, как мы вернемся к одному потоку, с sequential, но я не уверен, что это применимо к вашему вопросу.

Напомним, что если вы изменяете любое количество объектов, но только в одном конвейере за раз, вам не нужно беспокоиться о проблемах с многопоточностью, так как изменения ограничены одним потоком.Если он разделен на несколько конвейеров одновременно , вам придется беспокоиться об этом так же, как обычно с любым многопоточным приложением.Решение может включать использование блокировок или синхронизированных объектов, доступных в Java, или может быть более реактивный способ избежать этих проблем (например, обработка только в одном конвейере за раз), однако без конкретного примера трудно сказать *.1015 *

0 голосов
/ 14 июня 2019

Всякий раз, когда у вас есть состояние, которое читается и записывается из разных потоков, вам нужно использовать один из механизмов синхронизации, чтобы гарантировать, что поток «видит» эффекты действий других потоков.

Без надлежащей синхронизации этоВозможно, что потоки видят частичные эффекты или эффекты в неожиданном порядке.

Подробнее см. https://docs.oracle.com/javase/specs/jls/se8/html/jls-17.html#jls-17.4.5.

Таким образом, вы должны убедиться, что весь доступ к вашему объекту состояния правильно синхронизирован.В качестве альтернативы вы можете избежать изменения состояния в вашем конвейере.Вместо этого используйте неизменные структуры данных.

...