Критическая по времени выборка источника, наблюдаемого с Rx Java? - PullRequest
1 голос
/ 26 марта 2020

Я бы хотел иметь возможность измерять выбросы из источника, наблюдаемого через равные промежутки времени, независимо от интенсивной работы ЦП в дальнейшем. Этот код демонстрирует проблему: он надежно производит выборку с интервалами 500 мс с doSlowOperation() закомментированным, а затем начинает отставать и дает несогласованные результаты, когда он не закомментирован.

    @Test
    fun `test sample operator lag`() {
        Observable.interval(100, TimeUnit.MILLISECONDS)
            .sample(500, TimeUnit.MILLISECONDS)
            .timestamp(TimeUnit.MILLISECONDS)
            .map {
                doSlowOperation()
                it
            }
            .take(4)
            .toList()
            .subscribe { item ->
                val itemsWithNormalisedTimestamps = item.map {
                    val timeSinceStartMs = (it.time() - item.first().time())
                    "${timeSinceStartMs}ms"
                }
                println("Got items: $itemsWithNormalisedTimestamps")
            }

        Thread.sleep(20000)
    }

    private fun doSlowOperation() {
        val list = mutableListOf<Int>()
        for (i in 0..10000000) list.add(i)
    }

Вывод с doSlowOperation() закомментированным:

Got items: [0ms, 500ms, 1001ms, 1499ms]

Вывод с doSlowOperation() без комментариев:

Got items: [0ms, 1610ms, 2093ms, 3157ms]

Обратите внимание, что хотя было бы хорошо, если бы медленная операция выполнялась более или менее в реальном времени, это не так чувствителен ко времени, как сама выборка. Т.е. не имеет значения, накопится ли отставание выборочных выбросов в какую-то очередь для обработки, когда они могут, пока выборка продолжает происходить с точными интервалами.

Я пытался сделать источник наблюдаемый горячий (с publish() и connect()), а также попытался перенести медленную операцию на другой планировщик с observeOn(Schedulers.newThread()) без какой-либо удачи.

...