Напишите такому подписчику канала
public class RedisHostingRunner : HostedService
{
private readonly IServiceProvider _serviceProvider;
IRedisSubscriber _subscriber;
public RedisHostingRunner(IServiceProvider serviceProvider)
{
_serviceProvider = serviceProvider;
_subscriber = _serviceProvider.GetRequiredService<RedisSubscriber>();
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
//while (!cancellationToken.IsCancellationRequested)
//{
_subscriber.SubScribeChannel();
//await Task.Delay(TimeSpan.FromSeconds(60), cancellationToken);
//}
}
public Task ShutdownAsync(CancellationToken cancellationToken = default)
{
return Task.CompletedTask;
}
}
И в вашем подписчике установите обработчик сообщений
public void SubScribeChannel()
{
_logger.LogInformation("!SubScribeChannel started!!");
string channelName = _config.ActiveChannelName;
var pubSub = _connectionMultiplexer.GetSubscriber();
try
{
pubSub.Subscribe(channelName, async (channel, message) => await MessageActionAsync(message, channel));
}
catch(Exception ex)
{
_logger.LogInformation(String.Format("!error: {0}", ex.Message));
}
Debug.WriteLine("EOF");
}
В своем обработчике сделайте свою работу
private async Task MessageActionAsync(RedisValue message, string channel)
{
try
{
Transformer t = new Transformer(_logger);
_logger.LogInformation(String.Format("!SubScribeChannel message received on message!! channel: {0}, message: {1}", channel, message));
string transformedMessage = Transformer.TransformJsonStringData2Message(message);
List<Document> documents = Transformer.Deserialize<List<Document>>(transformedMessage);
await MergeToMongoDb(documents, channel);
_logger.LogInformation("!Merged");
}
catch (Exception ex)
{
_logger.LogInformation(String.Format("!error: {0}", ex.Message));
}
}