Как получить данные из нескольких каналов Redis и записать их в один и тот же веб-сокет? - PullRequest
0 голосов
/ 12 апреля 2020

Здравствуйте, у меня следующая проблема. У меня есть приложение ASP NET Core, и я хочу, чтобы клиенты могли использовать функцию Redis pub-sub. Для этого для каждого подключенного клиента я буду использовать Websocket, с которым:

1.Client will write his message to Redis / subscribe to a new channel
2.Whenever a new message arrives on a given channel i want to write it on the websocket back to the client.

Однако я хочу, чтобы направление server -> client веб-сокета было общим для нескольких каналов (каждый делегат для данного канал будет записывать в один и тот же сокет) Мне понадобится какой-нибудь способ QUEUE всех входящих сообщений со всех подписанных каналов и из этой очереди, чтобы извлекать сообщения одно за другим в сокет.

I думал о том, чтобы использовать список, где каналы pu sh сообщений, и специальный Task (никогда не заканчивающийся l oop), где я бы BLPOP сообщений. Как я могу объединить сообщения в очередь, поскольку BLPOP не реализовано?

Для более ясного понимания я разработал схему c: enter image description here

Сервис

public sealed class ChatClient {

        private Task writeTask;
        private Task popperTask;
        private WebSocket socket;
        private RedisStore store;
        private ISubscriber sub;

        public ChatClient(WebSocket socket, RedisStore store) {
            this.socket = socket;
            this.store = store;
        }


        public async Task RunAsync() {
            try {
                this.sub = this.store.Connection.GetSubscriber();
                this.writeTask = Task.Run(async () => await WriteLoopAsync());
                this.popperTask = Task.Run(async () => await PopLoopAsync());
                await Task.WhenAny(writeTask,popperTask);
            } catch (Exception ex) {

                throw;
            }
         }

        private async Task WriteLoopAsync() {

            while (true) {
                //receive some message from socket
                var message = await this.socket.ReceiveAsync([....]); 
                //find list of subscribed channels and if it does not exist subscribe to it
                //publish message to target channel
                await HandleMessageAsync(message);
            }
        }

        private async Task HandleMessageAsync(Message msg) {
            //subscribe to channel if it is not already subscribed;
            await this.sub.SubscribeAsync(msg.Channel, async (channel, value) => {
                await this.store.Database.ListRightPushAsync("popList", msg.Value);
            });
            await this.sub.PublishAsync(msg.Channel, msg.Value);
        }

        private async Task PopLoopAsync() {
            while (true) {
                //pop a message from the queue that is filled by channel delegates
                var data = await this.store.Connection.BLPOP("popList");
                //send the message on the websocket
                this.socket.SendAsync(data);
            }
        }
    }

RedisStore

 public sealed class RedisStore {
        public ConnectionMultiplexer Connection;
        public RedisStore(string connectionString) {
            this.Connection = ConnectionMultiplexer.Connect(connectionString);
        }
        public IDatabase Database => this.Connection.GetDatabase();
    }

Сообщение

public class Message {
        public string SenderID { get; set; }
        public string Channel { get; set; }
        public DateTime IssuedAt { get; set; }
        public string Value { get; set; }
    }
...