RxJava: загадочное поведение - PullRequest
0 голосов
/ 30 октября 2018

Я хочу, чтобы методы в моем классе выполняли некоторый код в потоке ввода-вывода, но только один раз предмет, на который они подписываются, имеет определенное значение. Затем вызывающая сторона должна получить ответ в потоке пользовательского интерфейса Android.

Примерно так:

public class MyClass {

  private final Subject<Boolean, Boolean> subject;
  private final OtherClass otherObject;

  public MyClass(Subject<Boolean, Boolean> subject,
      OtherClass otherObject) {
    this.subject = subject;
    this.otherObject = otherObject;
  }

  public Observable<String> myMethod() {
    return waitForTrue(() -> otherObject.readFromDisk());
  }

  private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
        .first(value -> value)
        .flatMap(value -> Observable.fromCallable(callable))
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
  }
}

Это работает? Не уверен, поэтому я написал набор юнит-тестов, чтобы проверить их. Я обнаружил, что мои тестовые методы, хотя они всегда работали при запуске по одному, потерпели неудачу как часть набора.

На самом деле, я обнаружил, что если бы я ставил один и тот же тест дважды, он прошел бы первый раз, но не прошел второй!

public class MyClassTest {

  private TestScheduler ioScheduler;
  private TestScheduler androidScheduler;
  private TestSubscriber<String> testSubscriber;
  private MyClass objectUnderTest;

  @Before public void setup() {
    ioScheduler = new TestScheduler();
    androidScheduler = new TestScheduler();
    testSubscriber = new TestSubscriber<>();
    RxJavaHooks.reset();
    RxJavaHooks.setOnIOScheduler(scheduler -> ioScheduler);
    RxAndroidPlugins.getInstance().reset();
    RxAndroidPlugins.getInstance().registerSchedulersHook(
        new RxAndroidSchedulersHook() {
          @Override public Scheduler getMainThreadScheduler() {
            return androidScheduler;
          };
        });
    Subject<Boolean, Boolean> subject = BehaviorSubject.create(true);
    MyClass.OtherClass otherClass = mock(MyClass.OtherClass.class);
    when(otherClass.readFromDisk()).thenReturn("mike");;
    objectUnderTest = new MyClass(subject, otherClass);
  };

  @Test public void firstTest() {
    objectUnderTest.myMethod().subscribe(testSubscriber);
    ioScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    androidScheduler.advanceTimeBy(1, TimeUnit.SECONDS);
    testSubscriber.assertValueCount(1);
    // This passes
  };

  @Test public void secondTest() {
    firstTest();
    // This fails!
  };
}

Почему это происходит? И это ошибка в тестируемом классе, или тестовый код?

Я думал, что это может быть проблема с использованием RxJava 1.x, но у меня была похожая проблема с RxJava 2.x.

РЕДАКТИРОВАТЬ: Тесты не удалось из-за пропущенной строки в коде теста. Вы должны указать это в методе настройки:

AndroidSchedulers.reset()

потому что ловушка вызывается только один раз статическим инициализатором класса AndroidSchedulers.

1 Ответ

0 голосов
/ 30 октября 2018

subscribeOn не имеет практического влияния на Subject, потому что у них нет побочного эффекта подписки для перемещения в другой поток. Поэтому, когда они получают новый товар, они уведомляют своих потребителей о потоке вызывающей стороны. Перемещение элемента в другой поток должно быть сделано через observeOn:

private <T> Observable<T> waitForTrue(Callable<T> callable) {
    return subject
    .filter(value -> value)
    .take(1)
    .observeOn(Schedulers.io())
    .map(value -> callable.call())
    .observeOn(AndroidSchedulers.mainThread());
}

Также вам не нужно flatMap только для выполнения callable, достаточно карты.

...