У меня есть поток RxJava, который просматривает все записи в таблице, по одной, отправляет электронное письмо для каждого получателя в каждой записи и сохраняет статус того, была ли отправка успешной или нет.Этот цикл повторяется до тех пор, пока все получатели не будут прочитаны из базы данных и не отправлены им сообщения.У меня проблема в том, что код от начала потока (начиная с getMessagesToSend) до storeMessageSent вызывается дважды для каждого получателя (кроме первого получателя):
val msgToSendPublisher = BehaviorSubject.createDefault(0)
msgToSendPublisher.flatMap { startPos -> App.context.repository.getMessageToSend() }
.flatMap { messageToSend ->
App.context.repository.sendMessage(messageToSend)
.doOnError {
messageToSend.failureSending = true
}.doOnNext { Observable.just(messageToSend) }
}
.zipWith( // 1 second delay between emissions.
Observable.interval(1, TimeUnit.SECONDS),
BiFunction { item: MessageToSend, _: Long -> item })
.flatMap { messageToSend ->
App.context.repository.storeMessageSent(messageToSend)
.doOnError {
messageToSend.failureSending = true
}.doOnNext { Observable.just(messageToSend) }
}
.doOnNext { messageToSend ->
msgToSendPublisher.onNext(0)
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ messageToSend ->
},
{ ex ->
if (ex is EmptyResultSetException) {
} else {
}
},
{
// Complete
}
)
}
Очевидно, что-то естьне так с моим потоком.Поток должен переходить к следующей записи только при вызове msgToSendPublisher.onNext (0).
Что вызывает повторение потока для каждой записи дважды?