Использование Rx Java для уровня ответа на запрос с помощью WebSockets - PullRequest
0 голосов
/ 06 мая 2020

Я пытаюсь реализовать уровень запроса -> ответа поверх веб-сокетов в Java. Недавно я наткнулся на RxJava, который кажется хорошей библиотекой для этого. Ниже приведен мой текущий подход к обработке потока ответов на запрос (неважный код опущен для удобства чтения)

public class SimpleServer extends WebSocketServer {
  Gson gson = new Gson();
  Map<String, Function<JsonObject, Void>> requests = new HashMap<>();

  private static int count = 0;

  public SimpleServer(InetSocketAddress address) {
    super(address);
  }

  @Override
  public void onMessage(WebSocket conn, String message) {
    String type = ...;
    JsonObject payload = ...;
    if (type.equals("response")) {
      Request request = requests.get(requestId).apply(payload);
    }
  }

  public Single<JsonObject> request(String action) {
    requests.put(Integer.toString(count++), response -> {
      source.onSuccess(response);
      return null;
    });
    broadcast(...);
  }
}

Это жизнеспособное решение или есть лучший способ?
Я думал, есть ли способ использования RxJava для обоих способов, т.е. запрос будет прослушивать наблюдаемый объект onMessage или что-то в этом роде.
Любая помощь будет принята с благодарностью.

1 Ответ

0 голосов
/ 06 мая 2020

Вы можете использовать Rx Java для связи обоими способами. Начнем с более простого - получения сообщений. Я рекомендую вам использовать BehaviorRelay, что ведет себя как Observer, так и Consumer. Вы можете как прослушивать испускаемые значения, так и создавать значения - в нашем случае сообщения. Простая реализация может выглядеть так:

public class SimpleServer extends WebSocketServer {
    private BehaviorRelay<String> receivedMessages = BehaviorRelay.create();

    public SimpleServer(InetSocketAddress address) {
        super(address);
    }

    @Override
    public void onMessage(WebSocket conn, String message) {
        receivedMessages.accept(message); // "sends" value to the relay
    }

    public Observable<String> getReceivedMessagesRx() {
        return receivedMessages.hide(); // Cast Relay to Observable
    }

    //...

Теперь вы можете вызвать функцию getReceivedMessagesRx() и подписаться на входящие сообщения.

Теперь более интересная часть - отправка сообщений. Предположим, у вас есть Observable, который создает сообщения, которые вы хотите отправить:

    // ...

    private Disposable senderDisposable = Disposables.disposed(); // (1)

    public void setMessagesSender(Observable<String> messagesToSend) { // (2)
        senderDisposable = messagesToSend.subscribe(message -> { 
            broadcast(message);
        }, throwable -> {
            // handle broadcast error 
        });
    }

    public void clear() { // (3)
        senderDisposable.dispose(); 
    }
}

Что происходит здесь:

  1. Create Disposable, который содержит ссылку на запущенный наблюдатель сообщения, которые будут отправлены.
  2. Подпишитесь на пройденный Observable, который излучается каждый раз, когда вы хотите отправить сообщение. Эта функция предназначена для однократного вызова. Если вы хотите вызывать его несколько раз, обработайте удаление предыдущего отправителя или используйте CompositeDisposable для хранения нескольких одноразовых предметов.
  3. Когда вы закончите работу со своим сервером, не забудьте удалить отправителя сообщений.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...