Вот мой пример использования: пользователь подписывается на мой поток с помощью websocket (GraphQl с подпиской), мне нужно вернуть экземпляр org.reactivestreams.Publisher
(который должен быть моей подпиской kafka topi c) по идентификатору пользователя.
Для иллюстрации примерно так:
/ **
* I don´t know how to get a instance of Publisher<Balance>
* It should be a consumer from a kafka topic
*/
fun balance(myStream: Publisher<Balance>, userId: String): Publisher<Balance> {
return myStream.filter { it.userId == userId }
}