Как мне оптимизировать шаг удаления из очереди / списка повторов? - PullRequest
0 голосов
/ 25 июня 2019

Я реализовал систему очередей Redis, которая обрабатывает задачи. Задачи изначально помещаются в рабочую очередь. Всякий раз, когда процесс требует задачи, задача помещается в очередь обработки. Всякий раз, когда процесс завершает выполнение, соответствующая задача из очереди обработки удаляется. В случае сбоя процесса на полпути выполнения процесс извлекается из очереди обработки обратно в рабочую очередь.

В этой реализации я также симулировал процессы, используя redis-list, хотя это будет не так. Очередь процессов обрабатывает список активных процессов. Если процесс завершится неудачно, соответствующий тайм-аут задачи выведет его из очереди обработки обратно в рабочую очередь.

Каждый раз, когда я извлекаю элемент из какой-либо очереди или проверяю время ожидания, мне нужно перебрать каждый элемент очереди и выполнить обработку. Есть ли в любом случае, где я могу оптимизировать эти задачи, чтобы сказать O (1), как с помощью пары ключ-значение. Пожалуйста, предложите улучшения.

Класс телефона: // для эмуляции задачи

namespace RedisConsoleApp1
{
    public class Phone
    {

        public string ID { get; set; }
        public string Model { get; set; }

        public DateTime ProcessStartTime { get; set; }

    }

}

Класс процесса:

namespace RedisConsoleApp1
{
    public class Process
    {

        public string ID { get; set; }

        public Phone task {get; set;}
    }

}

Program.cs:

namespace RedisConsoleApp1
{

        public class RedisList_WorkTask<T> : IList<T>
        {
            private static ConnectionMultiplexer _cnn;
            public string key;
            public RedisList_WorkTask(string key)
            {
                this.key = key;
                _cnn = ConnectionMultiplexer.Connect("localhost");
            }
            private IDatabase GetRedisDb()
            {
                return _cnn.GetDatabase();
            }
            private string Serialize(object obj)
            {
                return JsonConvert.SerializeObject(obj);
            }
            private T Deserialize<T>(string serialized)
            {
                return JsonConvert.DeserializeObject<T>(serialized);
            }
            public void Insert(int index, T item)
            {
                var db = GetRedisDb();
                var before = db.ListGetByIndex(key, index);
                db.ListInsertBefore(key, before, Serialize(item));
            }
            public void RemoveAt(int index)
            {
                var db = GetRedisDb();
                var value = db.ListGetByIndex(key, index);
                if (!value.IsNull)
                {
                    db.ListRemove(key, value);
                }
            }
            public T this[int index]
            {
                get
                {
                    var value = GetRedisDb().ListGetByIndex(key, index);
                    return Deserialize<T>(value.ToString());
                }
                set
                {
                    Insert(index, value);
                }
            }
            public void Add(T item)
            {
                GetRedisDb().ListLeftPush(key, Serialize(item));
            }
            public void Push(T item)
            {
                GetRedisDb().ListLeftPush(key, Serialize(item));

            }

            public void Pop()
            {
                var popped_ele = GetRedisDb().ListRightPop(key);
                //var popped_ele = GetRedisDb().ListRightPopLeftPush(key, proc_q);
                //var curr_obj = Deserialize<Phone>(popped_ele.ToString());
                Console.WriteLine("Popped {0}", popped_ele);
                //Console.WriteLine(curr_obj.Model);
            }
            public void Push_proc(RedisKey proc_q, T item)
            {
                var db = GetRedisDb();
            //var popped_ele = GetRedisDb().ListRightPop(key);

                var popped_ele = Remove(item);
                db.ListLeftPush(proc_q, popped_ele);
            //var curr_obj = Deserialize<Phone>(popped_ele.ToString());
                Console.WriteLine("Popped {0}",popped_ele);
                //Console.WriteLine(curr_obj.Model);
            }
            public void Clear()
            {
                GetRedisDb().KeyDelete(key);
            }
            public bool Contains(T item)
            {
                for (int i = 0; i < Count; i++)
                {
                    if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                    {
                        return true;
                    }
                }
                return false;
            }
            public void CopyTo(T[] array, int arrayIndex)
            {
                GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
            }
            public int IndexOf(T item)
            {
                for (int i = 0; i < Count; i++)
                {
                    if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                    {
                        return i;
                    }
                }
                return -1;
            }
            public int Count
            {
                get { return (int)GetRedisDb().ListLength(key); }
            }
            public bool IsReadOnly
            {
                get { return false; }
            }
            public bool Remove(T item)
            {
                return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
            }
            public IEnumerator<T> GetEnumerator()
            {
                for (int i = 0; i < this.Count; i++)
                {
                    yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
                }
            }
            System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
            {
                for (int i = 0; i < this.Count; i++)
                {
                    yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
                }
            }
        }
    public class Program
    {
        static void Main(string[] args)
        {
            //creating the queues
            var phoneList = new RedisList_WorkTask<Phone>("workTaskList");
            var phoneList_2 = new RedisList_ProcessingTask<Phone>("processingTaskList");
            var processList = new RedisList_Process<Process>("processList");

            //clearing up the queues
            phoneList.Clear();
            phoneList_2.Clear();
            processList.Clear();

            //creating sample tasks
            Phone p1 = new Phone() { ID = "1", Model = "abc", ProcessStartTime = DateTime.Now };
            Phone p2 = new Phone() { ID = "2", Model = "def", ProcessStartTime = DateTime.Now };
            Phone p3 = new Phone() { ID = "3", Model = "ghi", ProcessStartTime = DateTime.Now };
            Phone p4 = new Phone() { ID = "4", Model = "jkl", ProcessStartTime = DateTime.Now };
            Phone p5 = new Phone() { ID = "5", Model = "mno", ProcessStartTime = DateTime.Now };
            Phone p6 = new Phone() { ID = "6", Model = "pqr", ProcessStartTime = DateTime.Now };

            //creating the work queue
            Console.WriteLine("Creating the task working queue.....\n");
            phoneList.Push(p1);
            phoneList.Push(p2);
            phoneList.Push(p3);
            phoneList.Push(p4);
            phoneList.Push(p5);
            phoneList.Push(p6);



            Console.WriteLine("\nCount in task working queue: {0}", phoneList.Count);

            Console.WriteLine("\nCreating processes....");
            //creating the processes which fills up the task processing queue
            processList.Push(new Process { ID = "1", task = p1 }, phoneList.key, phoneList_2.key);
            processList.Push(new Process { ID = "2", task = p2 }, phoneList.key, phoneList_2.key);

            Console.WriteLine("\nNo of processes: {0}", processList.Count);

            //phoneList.Pop();
            //phoneList.Pop();
            //phoneList.Pop();
            Console.WriteLine("\nCount in task processing queue: {0}", phoneList_2.Count);
            Console.WriteLine("\nCount in task working queue: {0}", phoneList.Count);


            Console.WriteLine("\nReached the end!");

            Console.WriteLine("\nAll tasks currently in processing queue: ");
            phoneList_2.DisplayAll();

            //popping out timed-out tasks from task-processing queue to task work que 
            //phoneList_2.RefreshQueue(phoneList.key);

            processList.Pop(phoneList_2.key);

            //while (true)
            //{
            //    phoneList_2.RefreshQueue(phoneList.key);
            //}
            Console.ReadKey();
        }
    }
}

Additional.cs:

namespace RedisConsoleApp1
{

        public class RedisList_ProcessingTask<T> : IList<T>
        {
            private static ConnectionMultiplexer _cnn;
            public string key;
            public RedisList_ProcessingTask(string key)
            {
                this.key = key;
                _cnn = ConnectionMultiplexer.Connect("localhost");
            }
            private IDatabase GetRedisDb()
            {
                return _cnn.GetDatabase();
            }
            private string Serialize(object obj)
            {
                return JsonConvert.SerializeObject(obj);
            }
            private T Deserialize<T>(string serialized)
            {
                return JsonConvert.DeserializeObject<T>(serialized);
            }
            public void Insert(int index, T item)
            {
                var db = GetRedisDb();
                var before = db.ListGetByIndex(key, index);
                db.ListInsertBefore(key, before, Serialize(item));
            }
            public void RemoveAt(int index)
            {
                var db = GetRedisDb();
                var value = db.ListGetByIndex(key, index);
                if (!value.IsNull)
                {
                    db.ListRemove(key, value);
                }
            }
            public T this[int index]
            {
                get
                {
                    var value = GetRedisDb().ListGetByIndex(key, index);
                    return Deserialize<T>(value.ToString());
                }
                set
                {
                    Insert(index, value);
                }
            }
            public void Add(T item)
            {
                GetRedisDb().ListLeftPush(key, Serialize(item));
            }
            public void Push(T item)
            {
                GetRedisDb().ListLeftPush(key, Serialize(item));

            }
            public void Push_work(RedisKey k2, Phone item)
            {
                GetRedisDb().ListLeftPush(k2, Serialize(item));
                GetRedisDb().ListRemove(key, Serialize(item));
            }
            public void Pop()
            {
                var popped_ele = GetRedisDb().ListRightPop(key);
                Console.WriteLine(popped_ele);
            }
            public void Clear()
            {
                GetRedisDb().KeyDelete(key);
            }
            public bool Contains(T item)
            {
                for (int i = 0; i < Count; i++)
                {
                    if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                    {
                        return true;
                    }
                }
                return false;
            }
            public void CopyTo(T[] array, int arrayIndex)
            {
                GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
            }
            public int IndexOf(T item)
            {
                for (int i = 0; i < Count; i++)
                {
                    if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                    {
                        return i;
                    }
                }
                return -1;
            }
            public int Count
            {
                get { return (int)GetRedisDb().ListLength(key); }
            }
            public bool IsReadOnly
            {
                get { return false; }
            }
            public bool Remove(T item)
            {
                return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
            }

            public void DisplayAll()
            {
                for (int i = 0; i < Count; i++)
                {
                   Console.WriteLine(GetRedisDb().ListGetByIndex(key, i).ToString());   
                }
            }
            public void RefreshQueue(RedisKey work_q)
            {
                for (int i = 0; i < Count; i++)
                {
                //var temp_obj = GetRedisDb().ListGetByIndex(key, i).ToString();
                //var curr_obj = Deserialize<T>(temp_obj);
                    var temp_obj = GetRedisDb().ListGetByIndex(key, i);
                    var curr_obj = Deserialize<Phone>(temp_obj.ToString());
                    var cur_time = DateTime.Now;
                    TimeSpan value = cur_time.Subtract(curr_obj.ProcessStartTime);
                    if (value > new TimeSpan(0, 0, 10))
                    {
                        Console.WriteLine("\nPushing {0} from task-processing queue to " +
                            "task-work queue due to timeout of {1}", Serialize(curr_obj), value);
                        Push_work(work_q, curr_obj);
                    }
                }
            }
            public IEnumerator<T> GetEnumerator()
            {
                for (int i = 0; i < this.Count; i++)
                {
                    yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
                }
            }
            System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
            {
                for (int i = 0; i < this.Count; i++)
                {
                    yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
                }
            }
        }
}

Additional2.cs:

namespace RedisConsoleApp1
{

    public class RedisList_Process<T> : IList<T>
    {
        private static ConnectionMultiplexer _cnn;
        public string key;
        public RedisList_Process(string key)
        {
            this.key = key;
            _cnn = ConnectionMultiplexer.Connect("localhost");
        }
        private IDatabase GetRedisDb()
        {
            return _cnn.GetDatabase();
        }
        private string Serialize(object obj)
        {
            return JsonConvert.SerializeObject(obj);
        }
        private T Deserialize<T>(string serialized)
        {
            return JsonConvert.DeserializeObject<T>(serialized);
        }
        public void Insert(int index, T item)
        {
            var db = GetRedisDb();
            var before = db.ListGetByIndex(key, index);
            db.ListInsertBefore(key, before, Serialize(item));
        }
        public void RemoveAt(int index)
        {
            var db = GetRedisDb();
            var value = db.ListGetByIndex(key, index);
            if (!value.IsNull)
            {
                db.ListRemove(key, value);
            }
        }
        public T this[int index]
        {
            get
            {
                var value = GetRedisDb().ListGetByIndex(key, index);
                return Deserialize<T>(value.ToString());
            }
            set
            {
                Insert(index, value);
            }
        }
        public void Add(T item)
        {

            GetRedisDb().ListLeftPush(key, Serialize(item));
        }
        public void Push(Process item, RedisKey k1, RedisKey k2)
        {
            //var temp = Deserialize<Process>(item.ToString());
            //var temp2 = temp.task;
            GetRedisDb().ListLeftPush(key, Serialize(item));
            GetRedisDb().ListRemove(k1, Serialize(item.task));
            GetRedisDb().ListLeftPush(k2, Serialize(item.task));
            Console.WriteLine("\nPopped {0} from task work queue and pushed to task processing queue", Serialize(item.task));
            //phoneList.Push_proc(k2, item);
        }
        public void Pop(RedisKey k1)
        {
            var popped_ele = GetRedisDb().ListRightPop(key);
            var temp = Deserialize<Process>(popped_ele.ToString());
            var temp2 = temp.task;
            GetRedisDb().ListRemove(k1, Serialize(temp2));
            Console.WriteLine("\nPopped the process: {0}",Serialize(temp));
        }
        public void Clear()
        {
            GetRedisDb().KeyDelete(key);
        }
        public bool Contains(T item)
        {
            for (int i = 0; i < Count; i++)
            {
                if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                {
                    return true;
                }
            }
            return false;
        }
        public void CopyTo(T[] array, int arrayIndex)
        {
            GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
        }
        public int IndexOf(T item)
        {
            for (int i = 0; i < Count; i++)
            {
                if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
                {
                    return i;
                }
            }
            return -1;
        }
        public int Count
        {
            get { return (int)GetRedisDb().ListLength(key); }
        }
        public bool IsReadOnly
        {
            get { return false; }
        }
        public bool Remove(T item)
        {
            return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
        }

        public void DisplayAll()
        {
            for (int i = 0; i < Count; i++)
            {
                Console.WriteLine(GetRedisDb().ListGetByIndex(key, i).ToString());
            }
        }
        public IEnumerator<T> GetEnumerator()
        {
            for (int i = 0; i < this.Count; i++)
            {
                yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
            }
        }
        System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
        {
            for (int i = 0; i < this.Count; i++)
            {
                yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
            }
        }
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...