это продолжение Stateful Rsocket Application thread. Вот мой пример.
В настоящее время у нас есть три объекта CORBA, которые используются, как показано на диаграмме:
- LoginObject (на который ссылка извлекается через NamingService). Клиенты могут вызывать метод login () для получения сеанса
- Объект Session имеет различные методы для получения подробной информации о текущем контексте обслуживания и, что наиболее важно, для получения объекта транзакции
- Объект Transaction может использоваться для выполнения различных команд с помощью универсального метода, который принимает в качестве параметров имя-команды и список пар ключ-значение. После того, как клиент выполнил n команд, он может зафиксировать или откатить транзакцию (также с помощью методов объекта Transaction).
поэтому здесь мы используем объект сеанса для выполнения транзакций в нашем сервисе.
Чтобы заменить это на rsocket, мы написали простой протобуф:
message LoginRequest {
string username = 1;
string password = 2;
}
message LoginResponse {
bool success = 1;
}
message Command {
string command = 1;
}
message TransactionResult {
string result = 1;
}
service SimpleService {
rpc login (LoginRequest) returns (LoginResponse) {}
rpc transaction (stream Command) returns (TransactionResult) {}
}
Идея состоит в том, что, как только пользователь войдет в систему, он начнет передавать команды на серверы. В конце концов, клиент либо фиксирует, либо удаляет изменения.
Итак, вот код клиента для начала передачи значений на сервер:
public class SimpleClientWrapper {
SimpleServiceClient client;
...
public void runClient() {
SimpleServiceProto.LoginRequest login_request= SimpleServiceProto.LoginRequest.newBuilder().setUsername(this.username).setPassword(this.pass).build();
System.out.println("Trying to log in...");
client.login(login_request).doOnNext(response->{
if(response.getSuccess()) {
runCommands();
}else {
disconnect();
}
}).block();
}
public void runCommands() {
System.out.println("Login successfull. About to run some commands.");
Flux<Command> requests =
Flux.range(1, 11)
.map(i -> "sending -> " + i)
.map(s -> Command.newBuilder().setCommand(s).build());
TransactionResult response = client.transaction(requests).block();
System.out.println("Result was: " + response.getResult());
}
...
}
Как вы можете видеть, клиент войдет в систему с помощью метода runClient, и, если вход будет успешным, клиент выполнит метод runCommands, который просто выдаст некоторые значения.
На моем сервере, согласно protobuf, я создал метод транзакции:
@Override
public Mono<TransactionResult> transaction(Publisher<Command> messages, ByteBuf metadata) {
return Flux.from(messages)
.windowTimeout(10, Duration.ofSeconds(500))
.take(1)
.flatMap(Function.identity())
.reduce(
new ConcurrentHashMap<Character, AtomicInteger>(),
(map, s) -> {
char[] chars = s.getCommand().toCharArray();
for (char c : chars) {
map.computeIfAbsent(c, _c -> new AtomicInteger()).incrementAndGet();
}
return map;
})
.map(
map -> {
StringBuilder builder = new StringBuilder();
map.forEach(
(character, atomicInteger) -> {
builder
.append("character -> ")
.append(character)
.append(", count -> ")
.append(atomicInteger.get())
.append("\n");
});
String s = builder.toString();
return TransactionResult.newBuilder().setResult(s).build();
});
}
Теперь, когда я запускаю его, он не будет работать из-за ошибок блокировки на клиенте:
Exception in thread "main" java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-tcp-nio-4
Однако моя проблема не в том, что он не работает, вместо этого я хочу, чтобы каждый клиент имел сеанс с сервером, и для каждого из них сервер должен поддерживать состояние транзакции. Вот ссылка на git для полного кода: https://github.com/oe19fyfa/rsocket-clientstream-example
Итак, после всего этого написания мой вопрос: как правильно установить сеанс транзакции? Я знаю, что мой код очень любительский, поэтому я открыт для любых предложений?