Вы можете использовать Rx Java для связи обоими способами. Начнем с более простого - получения сообщений. Я рекомендую вам использовать BehaviorRelay
, что ведет себя как Observer
, так и Consumer
. Вы можете как прослушивать испускаемые значения, так и создавать значения - в нашем случае сообщения. Простая реализация может выглядеть так:
public class SimpleServer extends WebSocketServer {
private BehaviorRelay<String> receivedMessages = BehaviorRelay.create();
public SimpleServer(InetSocketAddress address) {
super(address);
}
@Override
public void onMessage(WebSocket conn, String message) {
receivedMessages.accept(message); // "sends" value to the relay
}
public Observable<String> getReceivedMessagesRx() {
return receivedMessages.hide(); // Cast Relay to Observable
}
//...
Теперь вы можете вызвать функцию getReceivedMessagesRx()
и подписаться на входящие сообщения.
Теперь более интересная часть - отправка сообщений. Предположим, у вас есть Observable, который создает сообщения, которые вы хотите отправить:
// ...
private Disposable senderDisposable = Disposables.disposed(); // (1)
public void setMessagesSender(Observable<String> messagesToSend) { // (2)
senderDisposable = messagesToSend.subscribe(message -> {
broadcast(message);
}, throwable -> {
// handle broadcast error
});
}
public void clear() { // (3)
senderDisposable.dispose();
}
}
Что происходит здесь:
- Create
Disposable
, который содержит ссылку на запущенный наблюдатель сообщения, которые будут отправлены. - Подпишитесь на пройденный
Observable
, который излучается каждый раз, когда вы хотите отправить сообщение. Эта функция предназначена для однократного вызова. Если вы хотите вызывать его несколько раз, обработайте удаление предыдущего отправителя или используйте CompositeDisposable
для хранения нескольких одноразовых предметов. - Когда вы закончите работу со своим сервером, не забудьте удалить отправителя сообщений.