Безопасно ли изменять значение в вызове RxJava doOnNext ()? - PullRequest
0 голосов
/ 06 сентября 2018

Я работаю с базой кода, где у нас много шаблонов, похожих на:

getPoJoObservableFromSomewhere() // no guarantees about the threading/scheduling here
  .doOnNext(pojo -> System.out.println("got pojo: " + pojo.toString())
  .doOnNext(pojo -> pojo.setName("my new pojo"))
  .observeOn(newThreadScheduler)
  .flatMap(pojo -> doSomethingWithPojo(pojo)) // no guarantees about the threading/scheduling in doSomethingWithPojo()
  .subscribe(pojo -> System.out.println("Got a pojo: " + pojo.toString()));

Или еще более вероятно:

Pojo myPojo = new Pojo("old name");
getNameFromSomewhere() // intentionally no guarantee what scheduler or thread this is observed on for this example
   .doOnNext(pojo -> pojo.setName("new name"))
   .flatMap(pojo -> doSomethingWithPojo(pojo)) // again, no guarantees about the threading inside this call
   .subscribe(pojo -> (), error -> System.out.println("Error handling goes here"));

Предполагается, что объекты Pojo представляют собой обычные объекты данных без побочных эффектов, которые выглядят примерно так:

class Pojo {
     private String name;
     public Pojo(String name) { this.name = name; }
     public String getName() { return name; }
     public void setName(String name) { this.name = name; }
}

Безопасен ли этот шаблон от незначительных ошибок потоков?

Самое большое, что я могу вспомнить, это барьеры памяти, но я не настолько опытен, чтобы игнорировать мысль о том, что могут быть вещи, о которых я не знаю.

Например, будут ли установлены барьеры памяти для того, чтобы после мутации в вызове doOnNext() все записи были завершены и были подхвачены тем, что происходит в doSomethingWithPojo() внутри flatMap?

В двух словах: безопасны ли эти шаблоны?

(Наша кодовая база - RxJava 1, но этот вопрос в равной степени относится и к RxJava 2)

1 Ответ

0 голосов
/ 06 сентября 2018

Это зависит от , если восходящий поток одновременно испускает элемент . Этот SO показывает пример того, как Subject может действовать без поточной защиты.

    AtomicInteger counter = new AtomicInteger();

    // Thread-safe
    // SerializedSubject<Object, Object> subject = PublishSubject.create().toSerialized();

    // Not Thread Safe
    PublishSubject<Object> subject = PublishSubject.create();

    Action1<Object> print = (x) -> System.out.println(Thread.currentThread().getName() + " " + counter);

    Consumer<Integer> sleep = (s) -> {
        try {
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    };

    subject
            .doOnNext(i -> counter.incrementAndGet())
            .doOnNext(i -> counter.decrementAndGet())
            .doOnNext(print)
            .filter(i -> counter.get() != 0)
            .doOnNext(i -> {
                        throw new NullPointerException("Concurrency detected");
                    }
            )
            .subscribe();

    Runnable r = () -> {
        for (int i = 0; i < 100000; i++) {
            sleep.accept(1);
            subject.onNext(i);
        }
    };

    ExecutorService pool = Executors.newFixedThreadPool(2);
    pool.execute(r);
    pool.execute(r);
...