Я действительно приложил усилия для реализации такого теста.
public class RxTest {
@Test
public void testConcurrency() {
Logout logout = new Logout();
AtomicInteger logoutCount = new AtomicInteger(0);
AtomicInteger errorCount = new AtomicInteger(0);
Completable logoutCompletable = Completable.fromAction(() -> logout.logout())
.subscribeOn(Schedulers.io())
.doOnComplete(() -> logoutCount.addAndGet(1))
.doOnError(error -> errorCount.addAndGet(1))
.onErrorComplete();
int tries = 50;
Completable[] arrayOfLogoutCompletables = new Completable[tries];
for (int i = 0; i < tries; i++) {
arrayOfLogoutCompletables[i] = logoutCompletable;
}
// run all in parallel and wait for all to finish
Completable.mergeArray(arrayOfLogoutCompletables).blockingAwait();
assertEquals(1,logoutCount.get());
assertEquals(tries - 1, errorCount.get());
}
private static class Logout {
private boolean loggedOut = false;
/**
* if you remove synchronized test will fail!!
*/
private synchronized void logout() {
if (loggedOut) throw new IllegalStateException();
loggedOut = true;
}
}
}
В тестах одновременно запускается до 50 Completables на Schedulers.io()
, каждый из которых вызывает logout()
.Есть счетчики, которые подсчитывают, сколько раз logout()
успешно и неудачно.blockingAwait
ожидает завершения всех завершений.Запустите этот тест 100 раз, и если вы удалите synchronized
, он не будет выполнен в 20% случаев.onErrorComplete()
существует для того, чтобы избежать распространения исключения до того, как все Completables завершат работу.
Интересный факт: если вы добавите getter и setter к loggedOut
и используете его внутри logout()
, то в большинстве случаев произойдет сбой без synchronized
.
Надеюсь, это поможет!