Я отправляю сообщение (пользовательский протокол, без HTTP) на мой сервер и хочу дождаться ответа.Он работает со следующим кодом:
class Connection {
val messages: Observable<Message>
fun sendMessageWithAnswer(message: Message, timeout:Int = 10): Observable<Answer> {
if (!isConnected) {
return Observable.just(Answer.NoConnection)
}
val result = BehaviorSubject.create<Answer>()
val neverDisposed = messages.filter {
it.header.messageId == message.header.messageId
}
.map { Answer.Success(it) as Answer}
.mergeWith(Observable.timer(timeout.toLong(), TimeUnit.SECONDS)
.map { Answer.Timeout })
.take(1).singleOrError()
.subscribe(
{result.onNext(it)},
{
// Should never happen
throw IllegalStateException("Waiting for answer failed: $it")
}
)
sendMessage(message)
return result
}
}
Проблема с этим решением в том, что "neverDisposed" никогда не удаляется, это утечка памяти?Мои другие решения не работают для этого теста:
@Test
fun ImmediateAnswer() {
prepare()
val message = ...
val answerObservable = connection.sendMessageWithAnswer(message, timeout = 1)
connection.receiveMessage(message)
val answer = answerObservable.test()
answer.awaitCount(1)
Thread.sleep(1000)
Assert.assertEquals(1, answer.valueCount())
Assert.assertEquals(Answer.Success(message), answer.values()[0])
}
У вас есть более чистое решение этой проблемы?