Observable.just (doSomeLongStuff ()) запустите doSomeLongStuff (), прежде чем я подпишусь на наблюдаемый - PullRequest
0 голосов
/ 10 июля 2019

У меня глупая проблема с RxJava2.

Мне нужно запустить две длинные операции одновременно.Я знаю, что должен использовать Observable.zip () и использую его.

Проблема в том, что мои длинные операции выполняются одна за другой, и другая проблема в том, что мои длинные операции начинаются до того, как я на них подписываюсь.

Давайте представим, что это моя длинная операция, которую я должен запустить async.

private String doSomethingLong() {
        Random rand = new Random();
        int value = rand.nextInt(5);
        Timber.i("Do something for [%d] sec [%s]", value, Thread.currentThread().getName());
        try {
            Thread.sleep(value * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return String.format(Locale.getDefault(), "Exception [%s]", e.getMessage());
        }
        return String.format(Locale.getDefault(),"Job for [%d] seconds", value);
    }

И пусть есть метод, подобный test (), который попытается сделать его параллельным:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
            @Override
            public Combined call(String s, String s2) {
                return new Combined(s, s2);
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Combined combined) {
                long total = System.currentTimeMillis() - started;
                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
            }
        });

    }

Когда я пытаюсь запустить это, я замечаю, что две наблюдаемые just1 и just2 работают одна за другой ... И это меня смутило ...

Но есть еще один персонал, который смутил меня больше... Я прокомментировал Observable.zip и заметил, что just1 и just2 запустили метод doSomethingLong () до того, как я подписался на них ...

Позвольте мне показать:

public void test() {

        final long started = System.currentTimeMillis();
        Observable<String> just1 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());
        Observable<String> just2 = Observable.just(doSomethingLong()).subscribeOn(Schedulers.newThread());


//        Observable.zip(just1, just2, new Func2<String, String, Combined>() {
//            @Override
//            public Combined call(String s, String s2) {
//                return new Combined(s, s2);
//            }
//        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Combined>() {
//            @Override
//            public void onCompleted() {
//
//            }
//
//            @Override
//            public void onError(Throwable e) {
//
//            }
//
//            @Override
//            public void onNext(Combined combined) {
//                long total = System.currentTimeMillis() - started;
//                Timber.i("TOTAL [%d]ms [%s]", total, combined.toString());
//            }
//        });

    }

Этот код делает почти то же самое- он запускается два раза doSomethingLong () один за другим ...

Что я ожидаю: 1. Мне нужно, чтобы методы doSomethingLong () работали параллельно 2. Я прошу объяснить, почему эти методы выполняются раньшеЯ запускаю subscrя их.3. Как я должен написать мне код хорошо в этой ситуации.Я хочу, чтобы методы doSomethingLong () не вызывались до того, как я на них подпишусь.

Большое спасибо.Надеюсь, я хорошо объясню проблему.

Ответы [ 2 ]

3 голосов
/ 11 июля 2019

Observable.just ничего не запускается при подписке.Он генерирует элементы при подписке, но ваш doSomethingLong запустится, как только вы передадите его в качестве аргумента.Это нормально, и так работает язык.

То, что вы ищете, это способ сказать, что возвращайте это, когда мы подписываемся, но также запускаем его только в это время и, надеюсь, в фоновом потоке.

На это есть несколько ответов, вот некоторые из них:

Использование defer

Есть оператор с именем defer, который принимает лямбду, которая будетвыполняется после подписки:

Observable.defer(() ->  doSomethingLong())

Это будет выполняться только doSomethingLong при подписке

Использование fromCallable

Вы можете создать наблюдаемое излямбдаЭто известно как fromCallable:

Observable.fromCallable(() -> doSomethingLong())

Аналогично, оно будет запускаться doSomethingLong только при подписке

Использование create

Iдумаю, что это, пожалуй, самый обескураженный способ сделать это, поскольку есть несколько вещей, с которыми вам приходится иметь дело, но я думаю, что для полноты картины можно упомянуть:

Observable.create( emitter -> {
    if(emitter.isDisposed()) return;

    emitter.onNext(doSomethingLong());
    emitter.onComplete();
});

Опять же, я 'Я уверен, что есть больше способов сделать это.Я просто хотел объяснить проблему и дать несколько вариантов.

0 голосов
/ 11 июля 2019

Создайте свои наблюдаемые как Observable.fromCallable {}.А вместо почтового индекса используйте функциюlateLatest ()

Документы: http://reactivex.io/RxJava/javadoc/io/reactivex/Observable.html#fromCallable-java.util.concurrent.Callable- http://reactivex.io/documentation/operators/combinelatest.html

...