RxJava: PublishSubject действует синхронно - PullRequest
0 голосов
/ 03 июля 2018

Мне нужна функциональность, которая позволяла бы асинхронно отправлять сообщения на мой PublishSubject и обрабатывать их в определенном темпе (фактически по одному) через ConnectableObservable. К сожалению, кажется, что вызов onNext из PublishSubject не прекращается, пока базовый Subscriber не обработает сообщение.

Требуется несколько секунд для обработки каждого сообщения, и в режиме отладки я вижу, что оно выполняется до того, как вызов метода, который отправляет сообщение в PublishSubject, удаляется из стека - "After push..." всегда появляется в консоли после внутренних журналов внутри Subscriber ...

Итак, у меня есть эта RestEndpoint:

@PUT
@Path("{id}")
@TokenAuthenticated
public Response postResource(@PathParam(value="id") final String extId) {
    executorService.execute(new Runnable() {

        @Override
        public void run() {
            try {
                Message metadata = processor.apply(extId);
                log.info("Before push...");
                dataImporter.pushData(metadata);
                log.info("After push...");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    return Response.ok("Request received successfully").build();

}

Вот конструктор DataImporter:

public DataImporter(final String configFile) {
        dataToImportSubject = PublishSubject.create();
        dataToImportObservable = dataToImportSubject.publish();
        dataToImportObservable.connect();
        dataToImportObservable
            .onBackpressureBuffer(1, new Action0() {

                @Override
                public void call() {
                    logger.debug("Buffer full...");
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Subscriber<Message>() {

                @Override
                public void onCompleted() {
                    // TODO Auto-generated method stub

                }

                @Override
                public void onError(Throwable e) {
                    logger.error("Error importing "+e.getMessage());
                }

                @Override
                public void onNext(Message value) {
                    request(1);
                    importResult(configFile, value);
                }

                @Override
                public void onStart() {
                    request(1);
                }
            });
    }

Тогда pushData DataImporter просто нажимает на PublishSubject onNext метод ..:

public void pushData(Message metadata) {
    dataToImportSubject.onNext(metadata);       
}

А вот и декларация PublishSubject и ConnectableObservable:

public class DataImporter implements ImporterProxy{

    private final PublishSubject<Message> dataToImportSubject;
    private final ConnectableObservable<Message> dataToImportObservable;

Ответы [ 2 ]

0 голосов
/ 04 июля 2018

PublishSubject s отправляют своим потребителям по нити оригинала onXXX, звонят:

JavaDocs

Планировщик :

PublishSubject не работает по умолчанию для определенного Scheduler, и Observer s получают уведомление в потоке, что были вызваны соответствующие onXXX методы.

Вы должны переместить обработку в другой поток с помощью observeOn, потому что observeOn может переместить вызовы onXXX в другой поток.

subscribeOn не оказывает никакого практического влияния на Subject с в целом, поскольку влияет только на поток подписки и не модулирует последующие onXXX вызовы этих субъектов.

0 голосов
/ 03 июля 2018

RxJava по умолчанию является синхронным. Вам необходимо ввести операторов в цепочку наблюдателей для выполнения действий в других потоках. Когда вы прочитаете документацию по каждому оператору в Observable, вы увидите операторы типа «... не работает в конкретном планировщике» - это указывает на то, что данные передаются через этот оператор синхронно.

Чтобы заставить цепочку наблюдателей выполнять действия в других потоках, вы можете использовать оператор, такой как subscribeOn(), с планировщиком, чтобы операции выполнялись с этим планировщиком. В вашем примере вы, вероятно, захотите использовать Schedulers.io() для обеспечения фонового потока.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...