Список одиночных игр, используйте только первый успешный результат - PullRequest
0 голосов
/ 05 мая 2020

Предположим, у меня есть список входных параметров:

val array = arrayOf(input1, input2, ... inputN) // N is usually less than 10

Мне нужно обработать эти параметры, выполнив несколько тяжелых вычислений. Поэтому для оптимизации я пытаюсь запускать каждое вычисление в своем собственном потоке, выполняемом одновременно с другими. Я использую для этого RxJava2:

sealed class Result {
   object Success : Result()
   object NotFound : Result()
}

fun processArray(arr: Array<Input>): Single<Result> {
    val singles = arr.map { input ->
        Single.fromCallable { 
            val time = System.currentTimeMillis()
            val r = process(input) 

            log("$r, took ${System.currentTimeMillis() - time}ms")
            return@fromCallable r
        }
            .subscribeOn(Schedulers.newThread())
    }

    return Single.zip(singles) { results ->
        val r = results.map { it as Result }
            .firstOrNull { it is Result.Success }
            ?: Result.NotFound

        log("result is: $r")
        return@zip r
    }
} 

fun process(input: Input): Result

Все работает, но когда я смотрю логи, я обычно вижу следующее:

NotFound, took 130ms
NotFound, took 300ms
Success, took 220ms
NotFound, took 78ms
NotFound, took 540ms
NotFound, took 256ms
result is Success
proccessing took 547ms

И это не имеет смысла, поскольку Мне нужен только первый успешный результат. Но этот код будет ждать, пока все они завершатся, даже если он уже нашел Result.Success (как видно из журналов, общее затраченное время == 547 мс, потому что мы ждали элемента с NotFound, took 540ms до fini sh , но на данный момент я получил Result.Success, я знал, что остальное будет NotFound)

Итак, вопрос:

Можно ли запускать несколько Single.fromCallable() и после найдя первый успешный результат, избавиться от остальных?

1 Ответ

1 голос
/ 05 мая 2020

Вы можете объединить вместо zip, а затем отфильтровать, чтобы получить 1-й элемент типа Success, например

sealed class Result {
    object Success : Result()
    object NotFound : Result()
}

fun processArray(arr: Array<Input>): Single<Result> {
    val singles = arr.map { input ->
        Single.fromCallable {
            val time = System.currentTimeMillis()
            val r = process(input)

            log("$r, took ${System.currentTimeMillis() - time}ms")
            return@fromCallable r
        }
            .subscribeOn(Schedulers.newThread())
    }

    return Single
        .merge(singles)
        .filter { it is Result.Success }
        .firstElement()
        .switchIfEmpty(Single.just(Result.NotFound))
}

fun process(input: Input): Result
...