Проблема с ReplaySubject - PullRequest
0 голосов
/ 04 июля 2018

Не уверен, правильно ли я использую эту функцию. У меня есть много потоков, которые добавляют элементы в ReplaySubject. Я хочу, чтобы пакетная запись в БД. В основном потоке у меня есть такой код:

while ((x < y) ) {  //Still some items to process
    if (errorOccured == true) {
        replaySubject.onCompleted();
    }   

    //Process 100 messages at a time
    replaySubject.buffer(100).subscribe(list -> dbHandle.setBatch(list));   
} 

Однако RxJava сам по себе завершается с таким следом:

*at rx.subjects.ReplaySubject$ReplayUnboundedBuffer.drain(ReplaySubject.java:642)
at rx.subjects.ReplaySubject$ReplayProducer.request(ReplaySubject.java:1268)
at rx.Subscriber.setProducer(Subscriber.java:211)
at rx.subjects.ReplaySubject$ReplayState.call(ReplaySubject.java:384)
at rx.subjects.ReplaySubject$ReplayState.call(ReplaySubject.java:360)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)
at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)
at rx.Observable.subscribe(Observable.java:10352)
at rx.Observable.subscribe(Observable.java:10319)
at rx.Observable.subscribe(Observable.java:10159)
at com.model.JobStateAndStatsManager.areTheThreadsDone(JobStateAndStatsManager.java:54)
...