Как правильно ждать ответа? - PullRequest
0 голосов
/ 12 мая 2019

Я отправляю сообщение (пользовательский протокол, без HTTP) на мой сервер и хочу дождаться ответа.Он работает со следующим кодом:


class Connection {

val messages: Observable<Message>
fun sendMessageWithAnswer(message: Message, timeout:Int = 10): Observable<Answer> {
        if (!isConnected) {
            return Observable.just(Answer.NoConnection)
        }

       val result = BehaviorSubject.create<Answer>()
       val neverDisposed = messages.filter {
            it.header.messageId == message.header.messageId
        }
        .map { Answer.Success(it) as Answer}
        .mergeWith(Observable.timer(timeout.toLong(), TimeUnit.SECONDS)
        .map { Answer.Timeout })
        .take(1).singleOrError()
        .subscribe(
                {result.onNext(it)},
                {
                    // Should never happen
                    throw IllegalStateException("Waiting for answer failed: $it")
                }
        )
        sendMessage(message)
        return result
    }
}

Проблема с этим решением в том, что "neverDisposed" никогда не удаляется, это утечка памяти?Мои другие решения не работают для этого теста:

    @Test
    fun ImmediateAnswer() {
        prepare()
        val message = ...
        val answerObservable = connection.sendMessageWithAnswer(message, timeout = 1)
        connection.receiveMessage(message)
        val answer = answerObservable.test()
        answer.awaitCount(1)
        Thread.sleep(1000)
        Assert.assertEquals(1, answer.valueCount())
        Assert.assertEquals(Answer.Success(message), answer.values()[0])
    }

У вас есть более чистое решение этой проблемы?

...