Я бы хотел иметь возможность измерять выбросы из источника, наблюдаемого через равные промежутки времени, независимо от интенсивной работы ЦП в дальнейшем. Этот код демонстрирует проблему: он надежно производит выборку с интервалами 500 мс с doSlowOperation()
закомментированным, а затем начинает отставать и дает несогласованные результаты, когда он не закомментирован.
@Test
fun `test sample operator lag`() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.sample(500, TimeUnit.MILLISECONDS)
.timestamp(TimeUnit.MILLISECONDS)
.map {
doSlowOperation()
it
}
.take(4)
.toList()
.subscribe { item ->
val itemsWithNormalisedTimestamps = item.map {
val timeSinceStartMs = (it.time() - item.first().time())
"${timeSinceStartMs}ms"
}
println("Got items: $itemsWithNormalisedTimestamps")
}
Thread.sleep(20000)
}
private fun doSlowOperation() {
val list = mutableListOf<Int>()
for (i in 0..10000000) list.add(i)
}
Вывод с doSlowOperation()
закомментированным:
Got items: [0ms, 500ms, 1001ms, 1499ms]
Вывод с doSlowOperation()
без комментариев:
Got items: [0ms, 1610ms, 2093ms, 3157ms]
Обратите внимание, что хотя было бы хорошо, если бы медленная операция выполнялась более или менее в реальном времени, это не так чувствителен ко времени, как сама выборка. Т.е. не имеет значения, накопится ли отставание выборочных выбросов в какую-то очередь для обработки, когда они могут, пока выборка продолжает происходить с точными интервалами.
Я пытался сделать источник наблюдаемый горячий (с publish()
и connect()
), а также попытался перенести медленную операцию на другой планировщик с observeOn(Schedulers.newThread())
без какой-либо удачи.