Здравствуйте, у меня следующая проблема. У меня есть приложение ASP NET Core
, и я хочу, чтобы клиенты могли использовать функцию Redis
pub-sub. Для этого для каждого подключенного клиента я буду использовать Websocket
, с которым:
1.Client will write his message to Redis / subscribe to a new channel
2.Whenever a new message arrives on a given channel i want to write it on the websocket back to the client.
Однако я хочу, чтобы направление server -> client
веб-сокета было общим для нескольких каналов (каждый делегат для данного канал будет записывать в один и тот же сокет) Мне понадобится какой-нибудь способ QUEUE всех входящих сообщений со всех подписанных каналов и из этой очереди, чтобы извлекать сообщения одно за другим в сокет.
I думал о том, чтобы использовать список, где каналы pu sh сообщений, и специальный Task
(никогда не заканчивающийся l oop), где я бы BLPOP
сообщений. Как я могу объединить сообщения в очередь, поскольку BLPOP
не реализовано?
Для более ясного понимания я разработал схему c:
Сервис
public sealed class ChatClient {
private Task writeTask;
private Task popperTask;
private WebSocket socket;
private RedisStore store;
private ISubscriber sub;
public ChatClient(WebSocket socket, RedisStore store) {
this.socket = socket;
this.store = store;
}
public async Task RunAsync() {
try {
this.sub = this.store.Connection.GetSubscriber();
this.writeTask = Task.Run(async () => await WriteLoopAsync());
this.popperTask = Task.Run(async () => await PopLoopAsync());
await Task.WhenAny(writeTask,popperTask);
} catch (Exception ex) {
throw;
}
}
private async Task WriteLoopAsync() {
while (true) {
//receive some message from socket
var message = await this.socket.ReceiveAsync([....]);
//find list of subscribed channels and if it does not exist subscribe to it
//publish message to target channel
await HandleMessageAsync(message);
}
}
private async Task HandleMessageAsync(Message msg) {
//subscribe to channel if it is not already subscribed;
await this.sub.SubscribeAsync(msg.Channel, async (channel, value) => {
await this.store.Database.ListRightPushAsync("popList", msg.Value);
});
await this.sub.PublishAsync(msg.Channel, msg.Value);
}
private async Task PopLoopAsync() {
while (true) {
//pop a message from the queue that is filled by channel delegates
var data = await this.store.Connection.BLPOP("popList");
//send the message on the websocket
this.socket.SendAsync(data);
}
}
}
RedisStore
public sealed class RedisStore {
public ConnectionMultiplexer Connection;
public RedisStore(string connectionString) {
this.Connection = ConnectionMultiplexer.Connect(connectionString);
}
public IDatabase Database => this.Connection.GetDatabase();
}
Сообщение
public class Message {
public string SenderID { get; set; }
public string Channel { get; set; }
public DateTime IssuedAt { get; set; }
public string Value { get; set; }
}