Отправить сообщение только определенному клиенту, используя веб-сокеты с Rsocket и Spring Webflux - PullRequest
1 голос
/ 05 апреля 2020

Я пытаюсь использовать Rsocket с websocket в одном из моих проектов PO C. В моем случае логин пользователя не требуется. Я хотел бы отправить сообщение только определенным клиентам, когда получаю сообщение от другого сервиса. По сути, мой поток выглядит следующим образом.

                                  Service A                               Service B   
|--------|    websocket     |------------------|   Queue based comm   |---------------| 
|  Web   |----------------->| Rsocket server   |--------------------->| Another       | 
|        |<-----------------| using Websocket  |<---------------------| service       |
|--------|    websocket     |------------------|   Queue based comm   |---------------|

В моем случае я думаю использовать уникальный идентификатор для каждого соединения и каждого запроса. Объедините оба идентификатора в качестве идентификатора корреляции и отправьте сообщение в Сервис B , а когда я получу сообщение от Сервис B , определите, к какому клиенту нужно go, и отправьте его. Теперь я понимаю, что мне могут не понадобиться 2 службы для этого, но я делаю это по нескольким другим причинам. Хотя у меня есть приблизительное представление о том, как реализовать другие части. Я новичок в концепции Rsocket. Можно ли отправить сообщение единственному определенному клиенту по определенному идентификатору, используя Spring Boot Webflux, Rsocket и websocket?

Ответы [ 2 ]

3 голосов
/ 27 апреля 2020

В принципе, я думаю, у вас есть два варианта здесь. Первый - это фильтрация потока, поступающего из Service B, второй - использование RSocketRequester и Map, как описано @NikolaB.

Первый вариант:

data class News(val category: String, val news: String)
data class PrivateNews(val destination: String, val news: News)

class NewsProvider {

    private val duration: Long = 250

    private val externalNewsProcessor = DirectProcessor.create<News>().serialize()
    private val sink = externalNewsProcessor.sink()

    fun allNews(): Flux<News> {
        return Flux
                .merge(
                        carNews(), bikeNews(), cosmeticsNews(),
                        externalNewsProcessor)
                .delayElements(Duration.ofMillis(duration))
    }

    fun externalNews(): Flux<News> {
        return externalNewsProcessor;
    }

    fun addExternalNews(news: News) {
        sink.next(news);
    }

    fun carNews(): Flux<News> {
        return Flux
                .just("new lambo!!", "amazing ferrari!", "great porsche", "very cool audi RS4 Avant", "Tesla i smarter than you")
                .map { News("CAR", it) }
                .delayElements(Duration.ofMillis(duration))
                .log()
    }

    fun bikeNews(): Flux<News> {
        return Flux
                .just("specialized enduro still the biggest dream", "giant anthem fast as hell", "gravel long distance test")
                .map { News("BIKE", it) }
                .delayElements(Duration.ofMillis(duration))
                .log()
    }

    fun cosmeticsNews(): Flux<News> {
        return Flux
                .just("nivea - no one wants to hear about that", "rexona anti-odor test")
                .map { News("COSMETICS", it) }
                .delayElements(Duration.ofMillis(duration))
                .log()
    }

}

@RestController
@RequestMapping("/sse")
@CrossOrigin("*")
class NewsRestController() {
    private val log = LoggerFactory.getLogger(NewsRestController::class.java)

    val newsProvider = NewsProvider()

    @GetMapping(value = ["/news/{category}"], produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun allNewsByCategory(@PathVariable category: String): Flux<News> {
        log.info("hello, getting all news by category: {}!", category)
        return newsProvider
                .allNews()
                .filter { it.category == category }
    }
}

Класс NewsProvider - это симуляция вашего Service B, которая должна вернуть Flux<>. Всякий раз, когда вы вызываете addExternalNews, вы получаете pu sh News, возвращаемое методом allNews. В классе NewsRestController мы фильтруем новости по категориям. Откройте браузер на localhost:8080/sse/news/CAR, чтобы видеть только автомобильные новости.

Если вы хотите вместо этого использовать RSocket, вы можете использовать метод, подобный этому:

    @MessageMapping("news.{category}")
    fun allNewsByCategory(@DestinationVariable category: String): Flux<News> {
        log.info("RSocket, getting all news by category: {}!", category)
        return newsProvider
                .allNews()
                .filter { it.category == category }
    }

Второй вариант:

Давайте сохраним RSocketRequester в HashMap (я использую vavr.io) с @ConnectMapping.

@Controller
class RSocketConnectionController {

    private val log = LoggerFactory.getLogger(RSocketConnectionController::class.java)

    private var requesterMap: Map<String, RSocketRequester> = HashMap.empty()

    @Synchronized
    private fun getRequesterMap(): Map<String, RSocketRequester> {
        return requesterMap
    }

    @Synchronized
    private fun addRequester(rSocketRequester: RSocketRequester, clientId: String) {
        log.info("adding requester {}", clientId)
        requesterMap = requesterMap.put(clientId, rSocketRequester)
    }

    @Synchronized
    private fun removeRequester(clientId: String) {
        log.info("removing requester {}", clientId)
        requesterMap = requesterMap.remove(clientId)
    }

    @ConnectMapping("client-id")
    fun onConnect(rSocketRequester: RSocketRequester, clientId: String) {
        val clientIdFixed = clientId.replace("\"", "")  //check serialezer why the add " to strings
//        rSocketRequester.rsocket().dispose()   //to reject connection
        rSocketRequester
                .rsocket()
                .onClose()
                .subscribe(null, null, {
                    log.info("{} just disconnected", clientIdFixed)
                    removeRequester(clientIdFixed)
                })
        addRequester(rSocketRequester, clientIdFixed)
    }

    @MessageMapping("private.news")
    fun privateNews(news: PrivateNews, rSocketRequesterParam: RSocketRequester) {
        getRequesterMap()
                .filterKeys { key -> checkDestination(news, key) }
                .values()
                .forEach { requester -> sendMessage(requester, news) }
    }

    private fun sendMessage(requester: RSocketRequester, news: PrivateNews) {
        requester
                .route("news.${news.news.category}")
                .data(news.news)
                .send()
                .subscribe()
    }

    private fun checkDestination(news: PrivateNews, key: String): Boolean {
        val list = destinations(news)
        return list.contains(key)
    }

    private fun destinations(news: PrivateNews): List<String> {
        return news.destination
                .split(",")
                .map { it.trim() }
    }
}

Обратите внимание, что мы должны добавить две вещи в клиент rsocket-js: полезная нагрузка в кадре SETUP для предоставления идентификатора клиента и регистрации ответчика для обработки сообщений, отправленных RSocketRequester.

const client = new RSocketClient({
// send/receive JSON objects instead of strings/buffers
serializers: {
  data: JsonSerializer,
  metadata: IdentitySerializer
},
setup: {
  //for connection mapping on server
  payload: {
    data: "provide-unique-client-id-here",
    metadata: String.fromCharCode("client-id".length) + "client-id"
  },
  // ms btw sending keepalive to server
  keepAlive: 60000,

  // ms timeout if no keepalive response
  lifetime: 180000,

  // format of `data`
  dataMimeType: "application/json",

  // format of `metadata`
  metadataMimeType: "message/x.rsocket.routing.v0"
},
responder: responder,
transport
});

Для получения дополнительной информации об этом см. этот вопрос: Как обрабатывать сообщения, отправленные с сервера на клиент с помощью RSocket?

0 голосов
/ 07 апреля 2020

Я еще лично не использовал RSocket с транспортом WebSocket, но, как указано в спецификации RSocket, базовый транспортный протокол даже не должен быть важным.

Один компонент RSocket является одновременно сервером и клиентом. Поэтому, когда браузеры подключаются к вашему «серверу» RSocket, вы можете добавить экземпляр RSocketRequester, который затем можно использовать для отправки сообщений «клиенту».

Затем вы можете добавить эти экземпляры в локальный кеш (например, поместить их в какой-нибудь глобально доступный ConcurrentHashMap с ключом по вашему выбору - что-то, из чего вы будете знать / сможете рассчитывать, каким клиентам следует сообщение от службы B будет распространено).

Затем в коде, где вы получаете сообщение от службы B, просто извлеките все экземпляры RSocketRequester из локального кэша, которые соответствуют вашим критериям, и отправьте им сообщение.

...