Предположим, у меня есть список входных параметров:
val array = arrayOf(input1, input2, ... inputN) // N is usually less than 10
Мне нужно обработать эти параметры, выполнив несколько тяжелых вычислений. Поэтому для оптимизации я пытаюсь запускать каждое вычисление в своем собственном потоке, выполняемом одновременно с другими. Я использую для этого RxJava2:
sealed class Result {
object Success : Result()
object NotFound : Result()
}
fun processArray(arr: Array<Input>): Single<Result> {
val singles = arr.map { input ->
Single.fromCallable {
val time = System.currentTimeMillis()
val r = process(input)
log("$r, took ${System.currentTimeMillis() - time}ms")
return@fromCallable r
}
.subscribeOn(Schedulers.newThread())
}
return Single.zip(singles) { results ->
val r = results.map { it as Result }
.firstOrNull { it is Result.Success }
?: Result.NotFound
log("result is: $r")
return@zip r
}
}
fun process(input: Input): Result
Все работает, но когда я смотрю логи, я обычно вижу следующее:
NotFound, took 130ms
NotFound, took 300ms
Success, took 220ms
NotFound, took 78ms
NotFound, took 540ms
NotFound, took 256ms
result is Success
proccessing took 547ms
И это не имеет смысла, поскольку Мне нужен только первый успешный результат. Но этот код будет ждать, пока все они завершатся, даже если он уже нашел Result.Success
(как видно из журналов, общее затраченное время == 547 мс, потому что мы ждали элемента с NotFound, took 540ms
до fini sh , но на данный момент я получил Result.Success
, я знал, что остальное будет NotFound)
Итак, вопрос:
Можно ли запускать несколько Single.fromCallable()
и после найдя первый успешный результат, избавиться от остальных?