Stateful Rsocket Application - PullRequest
       29

Stateful Rsocket Application

3 голосов
/ 03 мая 2019

В моем проекте я хочу, чтобы к услуге подключалось несколько клиентов.Я использую реализацию Java Rsocket.

Служба должна поддерживать состояние для каждого клиента.Теперь я могу управлять клиентами по некоторому идентификатору.Этот вариант я уже реализовал.Но я не хочу управлять сессией вручную, используя строки.

Поэтому другая идея заключается в идентификации клиентов по соединению Rsocket.Есть ли способ использовать канал Rsocket для идентификации конкретного клиента?

Представьте себе пример службы и пару клиентов.У каждого клиента есть канал Rsocket с запущенной и работающей службой.Есть ли способ идентифицировать этих клиентов на стороне сервера, используя канал Rsocket?Было бы удивительно, если бы вы могли показать программный пример такого поведения.Спасибо!

РЕДАКТИРОВАТЬ (описание случая более подробно)

Вот мой пример.

В настоящее время у нас есть три объекта CORBA, которые используются какпродемонстрировано на диаграмме:

  • LoginObject (на который извлекается ссылка через NamingService).Клиенты могут вызывать метод login () для получения сеанса
  • Объект Session имеет различные методы для получения подробных сведений о текущем контексте обслуживания и, что наиболее важно, для получения объекта Transaction
  • Объект Transaction можетиспользоваться для выполнения различных команд через универсальный метод, который принимает имя_команды и список пар ключ-значение в качестве параметров.После того, как клиент выполнил n команд, он может зафиксировать или откатить транзакцию (также с помощью методов объекта Transaction).

enter image description here

, поэтому здесь мыиспользуйте объект сеанса для выполнения транзакций в нашем сервисе.

Теперь мы решили перейти от CORBA к Rsocket.Таким образом, нам нужен микросервис Rsocket, чтобы иметь возможность хранить состояние сеанса, иначе мы не можем знать, что будет зафиксировано или откатано.Можно ли это сделать только с отдельным издателем для каждого клиента?

Ответы [ 3 ]

3 голосов
/ 15 мая 2019

Вот пример, который я сделал на днях, который создаст RSocket с состоянием с использованием брокера Netifi: https://github.com/netifi/netifi-stateful-socket

К сожалению, вам нужно построить нашу ветку разработки локально, чтобы опробовать ее (https://github.com/netifi/netifi-java) - к концу недели должен появиться релиз с кодом, если вы не хотите создавать его локально.

Я тоже работаю над чистым примером RSocket, но если вы хотитеПосмотрите, как он взглянул бы на StatefulSocket, найденный в примере. Он должен дать вам подсказку, как работать с сеансом с чистым RSocket.

Относительно ваших других вопросов о менеджере транзакций - вам нужно будетсвяжите вашу транзакцию с сигналами Reactive Streams, которые отправляются - если вы получили отмену, onError вы откатились, а если получили onComplete, вы бы зафиксировали транзакцию. Есть методы побочных эффектов от Flux / Mono, которые должны сделатьс этим легко справиться. В зависимости от того, что вы делаете, вы также можете использовать BaseSubscriber, так как он имеет зацепкииметь дело с различными сигналами реактивных потоков.

Спасибо, Роберт

2 голосов
/ 16 мая 2019

Пример возобновления соединений, то есть поддержания состояния на сервере, находится в репозитории rsocket-java

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

Возобновляет все соединение, включая любое состояние, с которым оно связанос каждым отдельным каналом и т. д.

Существует проект rsocket-cli , который позволяет вам попробовать это.Запустите и остановите процесс socat и наблюдайте за ходом работы клиента и сервера.

$ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
$ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
$ ./rsocket-cli -i client --stream --resume tcp://localhost:5001
0 голосов
/ 09 мая 2019

Из вашего описания похоже, что channel будет работать лучше, я раньше не использовал канал, поэтому не могу гарантировать (извините). Но то, что я бы порекомендовал вам попробовать что-то вроде этого:

Контроллер транскрипции:

public class TransactionController implements Publisher<Payload> {

    List<Transaction> transcations = new ArrayList<>();

    @Override
    public void subscribe(Subscriber<? super Payload> subscriber) {

    }

    public void processPayload(Payload payload) {
        // handle transcations...
    }
}

И в вашей RSocket реализации переопределите requestChannel:

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
    // Create new controller for each channel
    TranscationController cntrl = new TranscationController();
    Flux.from(payloads)
      .subscribe(cntrl::processPayload);
    return Flux.from(cntrl);
}
...