Поток RxJava повторяется дважды для каждой записи, полученной из базы данных - PullRequest
0 голосов
/ 09 июля 2019

У меня есть поток RxJava, который просматривает все записи в таблице, по одной, отправляет электронное письмо для каждого получателя в каждой записи и сохраняет статус того, была ли отправка успешной или нет.Этот цикл повторяется до тех пор, пока все получатели не будут прочитаны из базы данных и не отправлены им сообщения.У меня проблема в том, что код от начала потока (начиная с getMessagesToSend) до storeMessageSent вызывается дважды для каждого получателя (кроме первого получателя):

val msgToSendPublisher = BehaviorSubject.createDefault(0)

msgToSendPublisher.flatMap { startPos -> App.context.repository.getMessageToSend() }
    .flatMap { messageToSend ->
        App.context.repository.sendMessage(messageToSend)
            .doOnError {
                messageToSend.failureSending = true
            }.doOnNext { Observable.just(messageToSend) }
    }
    .zipWith( // 1 second delay between emissions.
        Observable.interval(1, TimeUnit.SECONDS),
        BiFunction { item: MessageToSend, _: Long -> item })
    .flatMap { messageToSend ->
        App.context.repository.storeMessageSent(messageToSend)
            .doOnError {
                messageToSend.failureSending = true
            }.doOnNext { Observable.just(messageToSend) }
    }
    .doOnNext { messageToSend ->
        msgToSendPublisher.onNext(0)
    }
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { messageToSend ->
        },
        { ex ->
            if (ex is EmptyResultSetException) {

            } else {
            }
        },
        {
            // Complete
        }
    )
    }

Очевидно, что-то естьне так с моим потоком.Поток должен переходить к следующей записи только при вызове msgToSendPublisher.onNext (0).

Что вызывает повторение потока для каждой записи дважды?

...