Потоковый клиент Java Rsocket - PullRequest
1 голос
/ 10 мая 2019

это продолжение Stateful Rsocket Application thread. Вот мой пример.

В настоящее время у нас есть три объекта CORBA, которые используются, как показано на диаграмме:

  • LoginObject (на который ссылка извлекается через NamingService). Клиенты могут вызывать метод login () для получения сеанса
  • Объект Session имеет различные методы для получения подробной информации о текущем контексте обслуживания и, что наиболее важно, для получения объекта транзакции
  • Объект Transaction может использоваться для выполнения различных команд с помощью универсального метода, который принимает в качестве параметров имя-команды и список пар ключ-значение. После того, как клиент выполнил n команд, он может зафиксировать или откатить транзакцию (также с помощью методов объекта Transaction).

enter image description here

поэтому здесь мы используем объект сеанса для выполнения транзакций в нашем сервисе. Чтобы заменить это на rsocket, мы написали простой протобуф:

message LoginRequest {
    string username = 1;
    string password = 2;
}

message LoginResponse {
    bool success = 1;
}

message Command {
    string command = 1;
}

message TransactionResult {
    string result = 1;
}

service SimpleService {
    rpc login (LoginRequest) returns (LoginResponse) {}
    rpc transaction (stream Command) returns (TransactionResult) {}
}

Идея состоит в том, что, как только пользователь войдет в систему, он начнет передавать команды на серверы. В конце концов, клиент либо фиксирует, либо удаляет изменения.

Итак, вот код клиента для начала передачи значений на сервер:

public class SimpleClientWrapper {
    SimpleServiceClient client;
    ...
    public void runClient() {
        SimpleServiceProto.LoginRequest login_request= SimpleServiceProto.LoginRequest.newBuilder().setUsername(this.username).setPassword(this.pass).build();
        System.out.println("Trying to log in...");
        client.login(login_request).doOnNext(response->{
            if(response.getSuccess()) {
                runCommands();
            }else {
                disconnect();
            }
        }).block();
    }

    public void runCommands() {
        System.out.println("Login successfull. About to run some commands.");
        Flux<Command> requests =
                Flux.range(1, 11)
                    .map(i -> "sending -> " + i)
                    .map(s -> Command.newBuilder().setCommand(s).build());


        TransactionResult response = client.transaction(requests).block();

        System.out.println("Result was: " + response.getResult());
    }

    ...
}

Как вы можете видеть, клиент войдет в систему с помощью метода runClient, и, если вход будет успешным, клиент выполнит метод runCommands, который просто выдаст некоторые значения.

На моем сервере, согласно protobuf, я создал метод транзакции:

    @Override
    public Mono<TransactionResult> transaction(Publisher<Command> messages, ByteBuf metadata) {
        return Flux.from(messages)
                 .windowTimeout(10, Duration.ofSeconds(500))
                 .take(1)
                 .flatMap(Function.identity())
                 .reduce(
                   new ConcurrentHashMap<Character, AtomicInteger>(),
                   (map, s) -> {
                     char[] chars = s.getCommand().toCharArray();
                     for (char c : chars) {
                       map.computeIfAbsent(c, _c -> new AtomicInteger()).incrementAndGet();
                     }

                     return map;
                   })
                 .map(
                   map -> {
                     StringBuilder builder = new StringBuilder();

                     map.forEach(
                       (character, atomicInteger) -> {
                         builder
                           .append("character -> ")
                           .append(character)
                           .append(", count -> ")
                           .append(atomicInteger.get())
                           .append("\n");
                       });

                     String s = builder.toString();

                     return TransactionResult.newBuilder().setResult(s).build();
                   });
    }

Теперь, когда я запускаю его, он не будет работать из-за ошибок блокировки на клиенте:

Exception in thread "main" java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-tcp-nio-4

Однако моя проблема не в том, что он не работает, вместо этого я хочу, чтобы каждый клиент имел сеанс с сервером, и для каждого из них сервер должен поддерживать состояние транзакции. Вот ссылка на git для полного кода: https://github.com/oe19fyfa/rsocket-clientstream-example

Итак, после всего этого написания мой вопрос: как правильно установить сеанс транзакции? Я знаю, что мой код очень любительский, поэтому я открыт для любых предложений?

...