Как реализовать потоки Redis с C# Rx - PullRequest
0 голосов
/ 26 мая 2020

Поскольку я не смог найти ни одной реализации, в которой мы не использовали бы al oop для получения содержимого потока, я начинаю реализовывать ее, но я столкнулся с несколькими проблемами, которые, возможно, некоторые из вас могут указать мне в нужное место.

Реализация использует комбинацию Pub / Sub и потока: * log -> stream channel * log: notification -> pub / sub * log: lastReadMessage -> содержит последний ключ чтения из потока

Издатель

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);

            while(true)
            {
                var value =  new NameValueEntry[]
                {
                    new NameValueEntry("id", Guid.NewGuid().ToString()),
                    new NameValueEntry("timestamp", DateTime.UtcNow.ToString())
                };

                redisDb.StreamAdd("log", value);
                var publisher = connectionMultiplexer.GetSubscriber();
                publisher.Publish("log:notify", string.Empty, CommandFlags.None);
                await Task.Delay(TimeSpan.FromSeconds(1));
            }
        }

Подписчик

        static async Task Main(string[] args)
        {
            var connectionMultiplexer = await ConnectionMultiplexer.ConnectAsync("localhost");
            var redisDb =  connectionMultiplexer.GetDatabase(1);


            var observableStream =  CreateTaskFromStream(connectionMultiplexer, redisDb, "log")
                .Subscribe(x => {
                  Console.WriteLine(x);  
                });

            Console.ReadLine();
        }
        private static SemaphoreSlim taskFromStreamBlocker = new SemaphoreSlim(1);

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }


            return Observable.Create<string>(obs => 
            {
                var subscriber = connection.GetSubscriber();
                subscriber.Subscribe($"{channel}:notify", async (ch, msg) => 
                {
                    var locker = await taskFromStreamBlocker
                        .WaitAsync(0)
                        .ConfigureAwait(false);

                    if (!locker)
                    {
                        return;
                    }

                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    taskFromStreamBlocker.Release();
                });

                return Disposable.Create(() => subscriber.Unsubscribe(channel));
            });
        }

Почему семафор?

Потому что у меня могло быть много сообщений добавить в поток, и я не хочу, чтобы одно и то же сообщение обрабатывалось дважды.

ПРОБЛЕМЫ

  1. Если у нас есть необработанные сообщения в потоке, как мы можем обрабатывать, не имея события из Pub / Sub. При запуске мы можем проверить, является ли это необработанным сообщением, и обработать его. Если в течение этого времени в поток добавляется новое сообщение, а мы еще не подписываемся на Pub / sub, подписчик не будет обрабатывать сообщение, пока мы не получим уведомление через Pub / Sub.

  2. Семафор важен, чтобы не обрабатывать одно и то же сообщение дважды, но в то же время это проклятие. В процессе сообщения в поток можно добавить еще одно. Когда это произойдет, подписчик будет обрабатывать не сразу, а только в следующий раз, когда он получит уведомление (в этот момент обработает два сообщения).

Как бы вы это реализовали? Есть ли реализация потоков Redis только с использованием Rx? Решение не должно использовать какой-то l oop и эффективно использовать память. Возможно ли это?

С наилучшими пожеланиями

Пауло Абоим Пинто

Ответы [ 3 ]

0 голосов
/ 26 мая 2020

и это другое решение, использующее таймер с временем истечения 200 мс


        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            var instance = ThreadPoolScheduler.Instance;

            return Observable.Create<string>(obs => 
            {
                var disposable = Observable
                    .Interval(TimeSpan.FromMilliseconds(200), instance)
                    .Subscribe(async _ => 
                    {
                        var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                        foreach(var message in messages)
                        {
                            obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                            lastReadMessage = message.Id;
                        }

                        redisDb.KeyDelete($"{channel}:lastReadMessage");
                        redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
                    });
                cancellationToken.Register(() => disposable.Dispose());

                return Disposable.Empty;    
            });
       }

0 голосов
/ 29 мая 2020

Я использую тайтовый l oop просто делаю XRange и сохраняю позицию - KISS .. но если нет работы, он отступает, так что это довольно быстро, когда много происходит, его тайтовое l oop.

Если вам нужна более высокая производительность, например, чтение во время обработки, я бы предостерегал от этого в большинстве случаев.

  1. Это создает большую сложность, и это должно быть круто solid.
  2. Redis обычно достаточно быстр
  3. «Я не хочу дважды обработать одно и то же сообщение ". почти каждая система имеет хотя бы одну поставку, поэтому устранение сбоев в работе невероятно сложно / медленно. Вы можете частично удалить его, используя хэш-набор идентификаторов, но для потребителей довольно тривиально иметь дело с ним и сообщениями, предназначенными для идемпотентности. Вероятно, это root причина проблем с дизайном сообщений. Если вы разделите каждый считыватель (отдельный поток и 1 рабочий на поток), вы можете сохранить хэш-набор в памяти, избегая проблем с масштабированием / распределением. Обратите внимание, что поток Redis может сохранять порядок, используйте это для упрощения идемпотентных сообщений.
  4. Исключения, вы не хотите останавливать обработку потока, потому что у потребителя есть исключение logi c в 1 сообщении, например, при звонке ночью вся система остановилась, блокировки усугубляют ситуацию. Данные о событии не могут быть изменены, это произошло, поэтому постарайтесь сделать все возможное. Тем не менее, исключения в инфракрасном / редком режиме все же необходимо бросить и повторить. Управлять этим за пределами al oop очень болезненно.
  5. Простое противодавление. Если вы не можете обрабатывать работу достаточно быстро, l oop замедляется вместо того, чтобы создавать множество задач и взрывать всю вашу память.

Я больше не использую распределенные блокировки / семафоры.

Если вы имеете дело с командами, например, dosomething вместо xyz, они могут потерпеть неудачу. Опять же, потребитель должен иметь дело со случаем, когда это уже произошло, а не с частью чтения redis / stream.

Некоторые библиотеки с magi c обратными вызовами не решают эти проблемы, обратные вызовы будут иметь повторную попытку по истечении времени ожидания на любом узле и c. Сложность / проблемы все еще существуют, они просто перемещаются в другое место.

У вас может быть наблюдаемое сверху для потребителей, но это в основном cosmeti. вы увидите тот же l oop. Я бы не использовал это вместо того, чтобы заставить потребителя зарегистрировать действие.

например,

    public interface IStreamSubscriber
    {
        void RegisterEventCallBack(Func<object, IReadOnlyDictionary<string, string>, Task> callback);
        void RegisterBatchEventCallBack(Func<IEnumerable<(object msg, IReadOnlyDictionary<string, string> metaData)>, Task> batchCallback);
        void Start();
    }    

В вашем случае обратный вызов может иметь наблюдаемое и не использовать l oop, но там - это низкий уровень l oop внизу, который также может выполнять преобразование сообщения в объект для потребителя.

0 голосов
/ 26 мая 2020

это решение с WHILE, которого я хочу избежать

        private static IObservable<string> CreateTaskFromStream(ConnectionMultiplexer connection, IDatabase redisDb, string channel, CancellationToken cancellationToken)
        {
            var lastReadMessage = "0-0";

            var lastReadMessageData = redisDb.StringGet($"{channel}:lastReadMessage", CommandFlags.None);
            if (string.IsNullOrEmpty(lastReadMessageData))
            {
                redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);
            }
            else
            {
                lastReadMessage = lastReadMessageData;
            }

            return Observable.Create<string>(async obs => 
            {
                while(!cancellationToken.IsCancellationRequested)
                {
                    var messages = await redisDb.StreamReadAsync(channel, lastReadMessage);

                    foreach(var message in messages)
                    {
                        obs.OnNext($"{message.Id} -> {message.Values[0].Name}: {message.Values[0].Value} / {message.Values[1].Name}: {message.Values[1].Value}");
                        lastReadMessage = message.Id;
                    }

                    redisDb.KeyDelete($"{channel}:lastReadMessage");
                    redisDb.StringGetSet($"{channel}:lastReadMessage", lastReadMessage);

                    await Task.Delay(TimeSpan.FromMilliseconds(500));
                }

                return Disposable.Empty;
            });
        }
...