Несоответствие типов в rxJava - PullRequest
1 голос
/ 06 июня 2019

Я пытаюсь повторно использовать некоторый код, который я помещаю в оператор retryWhen, но он не скомпилируется, потому что говорит, что я предоставляю неправильный тип.

Следующий код содержит оператор retryWhen, который работает нормально:

var startPos = 0
val startPositions = BehaviorSubject.createDefault(startPos)

startPositions.flatMap { startPos -> App.context.repository.getConnections(startPos) }
    .flatMap { connections ->
        App.context.repository.storeConnections(connections)
    }
    .doOnNext { connections ->
        run {
            if (connections.isNotEmpty()) {
                startPos += 40
                startPositions.onNext(startPos)
            }
        }
    }
    .retryWhen(ExpBackoff(DefaultJitter(), delay = 1, unit = TimeUnit.SECONDS, retries = 5)) // About 30 seconds max.
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { connections ->
            if (connections.isEmpty()) {
                getConnectionsCompleted()
            }
        }
    )

Вот код, в котором я хочу повторно использовать оператор retryWhen:

App.context.repository.getUserSettings()
    .subscribeOn(Schedulers.io())
    .retryWhen(ExpBackoff(DefaultJitter(), delay = 1, unit = TimeUnit.SECONDS, retries = 5)) // About 30 seconds max.
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        { userSettings ->
            App.context.repository.cachedUserSettings = userSettings

        }
    )

Класс ExpBackoff:

class ExpBackoff(
    private val jitter: Jitter,
    private val delay: Long,
    private val unit: TimeUnit,
    private val retries: Int = 0
) : io.reactivex.functions.Function<Observable<out Throwable>, Observable<Long>> {

    @Throws(Exception::class)
    override fun apply(observable: Observable<out Throwable>): Observable<Long> {
        return observable
            .zipWith(Observable.range(1, retries + 1), BiFunction<Throwable, Int, Int> { _, retryCount ->
                retryCount
            })
            .flatMap { attemptNumber ->
                if (attemptNumber == retries)
                    throw java.lang.Exception()
                else
                    Observable.timer(getNewInterval(attemptNumber), unit)
            }
    }

    private fun getNewInterval(retryCount: Int): Long {
        var newInterval = (delay * Math.pow(retryCount.toDouble(), 2.0) * jitter.get()).toLong()
        if (newInterval < 0) {
            newInterval = Long.MAX_VALUE
        }
        return newInterval
    }
}

Объявления API:

override fun getConnections(startPos: Int): Observable<List<Connection>> {
    return api.getConnections(startPos)
}

override fun getUserSettings(): Single<UserSettings> {
    return api.getSettings()
}

Ошибка компиляции указывает на «несоответствие типов»:

Обязательно: Function !, out Publisher <*>!>!

Найдено: ExpBackoff

Что мне нужно сделать, чтобы повторить попытку при работе во втором примере?

...