Я реализовал систему очередей Redis, которая обрабатывает задачи. Задачи изначально помещаются в рабочую очередь. Всякий раз, когда процесс требует задачи, задача помещается в очередь обработки. Всякий раз, когда процесс завершает выполнение, соответствующая задача из очереди обработки удаляется. В случае сбоя процесса на полпути выполнения процесс извлекается из очереди обработки обратно в рабочую очередь.
В этой реализации я также симулировал процессы, используя redis-list, хотя это будет не так. Очередь процессов обрабатывает список активных процессов. Если процесс завершится неудачно, соответствующий тайм-аут задачи выведет его из очереди обработки обратно в рабочую очередь.
Каждый раз, когда я извлекаю элемент из какой-либо очереди или проверяю время ожидания, мне нужно перебрать каждый элемент очереди и выполнить обработку. Есть ли в любом случае, где я могу оптимизировать эти задачи, чтобы сказать O (1), как с помощью пары ключ-значение. Пожалуйста, предложите улучшения.
Класс телефона: // для эмуляции задачи
namespace RedisConsoleApp1
{
public class Phone
{
public string ID { get; set; }
public string Model { get; set; }
public DateTime ProcessStartTime { get; set; }
}
}
Класс процесса:
namespace RedisConsoleApp1
{
public class Process
{
public string ID { get; set; }
public Phone task {get; set;}
}
}
Program.cs:
namespace RedisConsoleApp1
{
public class RedisList_WorkTask<T> : IList<T>
{
private static ConnectionMultiplexer _cnn;
public string key;
public RedisList_WorkTask(string key)
{
this.key = key;
_cnn = ConnectionMultiplexer.Connect("localhost");
}
private IDatabase GetRedisDb()
{
return _cnn.GetDatabase();
}
private string Serialize(object obj)
{
return JsonConvert.SerializeObject(obj);
}
private T Deserialize<T>(string serialized)
{
return JsonConvert.DeserializeObject<T>(serialized);
}
public void Insert(int index, T item)
{
var db = GetRedisDb();
var before = db.ListGetByIndex(key, index);
db.ListInsertBefore(key, before, Serialize(item));
}
public void RemoveAt(int index)
{
var db = GetRedisDb();
var value = db.ListGetByIndex(key, index);
if (!value.IsNull)
{
db.ListRemove(key, value);
}
}
public T this[int index]
{
get
{
var value = GetRedisDb().ListGetByIndex(key, index);
return Deserialize<T>(value.ToString());
}
set
{
Insert(index, value);
}
}
public void Add(T item)
{
GetRedisDb().ListLeftPush(key, Serialize(item));
}
public void Push(T item)
{
GetRedisDb().ListLeftPush(key, Serialize(item));
}
public void Pop()
{
var popped_ele = GetRedisDb().ListRightPop(key);
//var popped_ele = GetRedisDb().ListRightPopLeftPush(key, proc_q);
//var curr_obj = Deserialize<Phone>(popped_ele.ToString());
Console.WriteLine("Popped {0}", popped_ele);
//Console.WriteLine(curr_obj.Model);
}
public void Push_proc(RedisKey proc_q, T item)
{
var db = GetRedisDb();
//var popped_ele = GetRedisDb().ListRightPop(key);
var popped_ele = Remove(item);
db.ListLeftPush(proc_q, popped_ele);
//var curr_obj = Deserialize<Phone>(popped_ele.ToString());
Console.WriteLine("Popped {0}",popped_ele);
//Console.WriteLine(curr_obj.Model);
}
public void Clear()
{
GetRedisDb().KeyDelete(key);
}
public bool Contains(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return true;
}
}
return false;
}
public void CopyTo(T[] array, int arrayIndex)
{
GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
}
public int IndexOf(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return i;
}
}
return -1;
}
public int Count
{
get { return (int)GetRedisDb().ListLength(key); }
}
public bool IsReadOnly
{
get { return false; }
}
public bool Remove(T item)
{
return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
}
public IEnumerator<T> GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
}
public class Program
{
static void Main(string[] args)
{
//creating the queues
var phoneList = new RedisList_WorkTask<Phone>("workTaskList");
var phoneList_2 = new RedisList_ProcessingTask<Phone>("processingTaskList");
var processList = new RedisList_Process<Process>("processList");
//clearing up the queues
phoneList.Clear();
phoneList_2.Clear();
processList.Clear();
//creating sample tasks
Phone p1 = new Phone() { ID = "1", Model = "abc", ProcessStartTime = DateTime.Now };
Phone p2 = new Phone() { ID = "2", Model = "def", ProcessStartTime = DateTime.Now };
Phone p3 = new Phone() { ID = "3", Model = "ghi", ProcessStartTime = DateTime.Now };
Phone p4 = new Phone() { ID = "4", Model = "jkl", ProcessStartTime = DateTime.Now };
Phone p5 = new Phone() { ID = "5", Model = "mno", ProcessStartTime = DateTime.Now };
Phone p6 = new Phone() { ID = "6", Model = "pqr", ProcessStartTime = DateTime.Now };
//creating the work queue
Console.WriteLine("Creating the task working queue.....\n");
phoneList.Push(p1);
phoneList.Push(p2);
phoneList.Push(p3);
phoneList.Push(p4);
phoneList.Push(p5);
phoneList.Push(p6);
Console.WriteLine("\nCount in task working queue: {0}", phoneList.Count);
Console.WriteLine("\nCreating processes....");
//creating the processes which fills up the task processing queue
processList.Push(new Process { ID = "1", task = p1 }, phoneList.key, phoneList_2.key);
processList.Push(new Process { ID = "2", task = p2 }, phoneList.key, phoneList_2.key);
Console.WriteLine("\nNo of processes: {0}", processList.Count);
//phoneList.Pop();
//phoneList.Pop();
//phoneList.Pop();
Console.WriteLine("\nCount in task processing queue: {0}", phoneList_2.Count);
Console.WriteLine("\nCount in task working queue: {0}", phoneList.Count);
Console.WriteLine("\nReached the end!");
Console.WriteLine("\nAll tasks currently in processing queue: ");
phoneList_2.DisplayAll();
//popping out timed-out tasks from task-processing queue to task work que
//phoneList_2.RefreshQueue(phoneList.key);
processList.Pop(phoneList_2.key);
//while (true)
//{
// phoneList_2.RefreshQueue(phoneList.key);
//}
Console.ReadKey();
}
}
}
Additional.cs:
namespace RedisConsoleApp1
{
public class RedisList_ProcessingTask<T> : IList<T>
{
private static ConnectionMultiplexer _cnn;
public string key;
public RedisList_ProcessingTask(string key)
{
this.key = key;
_cnn = ConnectionMultiplexer.Connect("localhost");
}
private IDatabase GetRedisDb()
{
return _cnn.GetDatabase();
}
private string Serialize(object obj)
{
return JsonConvert.SerializeObject(obj);
}
private T Deserialize<T>(string serialized)
{
return JsonConvert.DeserializeObject<T>(serialized);
}
public void Insert(int index, T item)
{
var db = GetRedisDb();
var before = db.ListGetByIndex(key, index);
db.ListInsertBefore(key, before, Serialize(item));
}
public void RemoveAt(int index)
{
var db = GetRedisDb();
var value = db.ListGetByIndex(key, index);
if (!value.IsNull)
{
db.ListRemove(key, value);
}
}
public T this[int index]
{
get
{
var value = GetRedisDb().ListGetByIndex(key, index);
return Deserialize<T>(value.ToString());
}
set
{
Insert(index, value);
}
}
public void Add(T item)
{
GetRedisDb().ListLeftPush(key, Serialize(item));
}
public void Push(T item)
{
GetRedisDb().ListLeftPush(key, Serialize(item));
}
public void Push_work(RedisKey k2, Phone item)
{
GetRedisDb().ListLeftPush(k2, Serialize(item));
GetRedisDb().ListRemove(key, Serialize(item));
}
public void Pop()
{
var popped_ele = GetRedisDb().ListRightPop(key);
Console.WriteLine(popped_ele);
}
public void Clear()
{
GetRedisDb().KeyDelete(key);
}
public bool Contains(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return true;
}
}
return false;
}
public void CopyTo(T[] array, int arrayIndex)
{
GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
}
public int IndexOf(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return i;
}
}
return -1;
}
public int Count
{
get { return (int)GetRedisDb().ListLength(key); }
}
public bool IsReadOnly
{
get { return false; }
}
public bool Remove(T item)
{
return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
}
public void DisplayAll()
{
for (int i = 0; i < Count; i++)
{
Console.WriteLine(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
public void RefreshQueue(RedisKey work_q)
{
for (int i = 0; i < Count; i++)
{
//var temp_obj = GetRedisDb().ListGetByIndex(key, i).ToString();
//var curr_obj = Deserialize<T>(temp_obj);
var temp_obj = GetRedisDb().ListGetByIndex(key, i);
var curr_obj = Deserialize<Phone>(temp_obj.ToString());
var cur_time = DateTime.Now;
TimeSpan value = cur_time.Subtract(curr_obj.ProcessStartTime);
if (value > new TimeSpan(0, 0, 10))
{
Console.WriteLine("\nPushing {0} from task-processing queue to " +
"task-work queue due to timeout of {1}", Serialize(curr_obj), value);
Push_work(work_q, curr_obj);
}
}
}
public IEnumerator<T> GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
}
}
Additional2.cs:
namespace RedisConsoleApp1
{
public class RedisList_Process<T> : IList<T>
{
private static ConnectionMultiplexer _cnn;
public string key;
public RedisList_Process(string key)
{
this.key = key;
_cnn = ConnectionMultiplexer.Connect("localhost");
}
private IDatabase GetRedisDb()
{
return _cnn.GetDatabase();
}
private string Serialize(object obj)
{
return JsonConvert.SerializeObject(obj);
}
private T Deserialize<T>(string serialized)
{
return JsonConvert.DeserializeObject<T>(serialized);
}
public void Insert(int index, T item)
{
var db = GetRedisDb();
var before = db.ListGetByIndex(key, index);
db.ListInsertBefore(key, before, Serialize(item));
}
public void RemoveAt(int index)
{
var db = GetRedisDb();
var value = db.ListGetByIndex(key, index);
if (!value.IsNull)
{
db.ListRemove(key, value);
}
}
public T this[int index]
{
get
{
var value = GetRedisDb().ListGetByIndex(key, index);
return Deserialize<T>(value.ToString());
}
set
{
Insert(index, value);
}
}
public void Add(T item)
{
GetRedisDb().ListLeftPush(key, Serialize(item));
}
public void Push(Process item, RedisKey k1, RedisKey k2)
{
//var temp = Deserialize<Process>(item.ToString());
//var temp2 = temp.task;
GetRedisDb().ListLeftPush(key, Serialize(item));
GetRedisDb().ListRemove(k1, Serialize(item.task));
GetRedisDb().ListLeftPush(k2, Serialize(item.task));
Console.WriteLine("\nPopped {0} from task work queue and pushed to task processing queue", Serialize(item.task));
//phoneList.Push_proc(k2, item);
}
public void Pop(RedisKey k1)
{
var popped_ele = GetRedisDb().ListRightPop(key);
var temp = Deserialize<Process>(popped_ele.ToString());
var temp2 = temp.task;
GetRedisDb().ListRemove(k1, Serialize(temp2));
Console.WriteLine("\nPopped the process: {0}",Serialize(temp));
}
public void Clear()
{
GetRedisDb().KeyDelete(key);
}
public bool Contains(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return true;
}
}
return false;
}
public void CopyTo(T[] array, int arrayIndex)
{
GetRedisDb().ListRange(key).CopyTo(array, arrayIndex);
}
public int IndexOf(T item)
{
for (int i = 0; i < Count; i++)
{
if (GetRedisDb().ListGetByIndex(key, i).ToString().Equals(Serialize(item)))
{
return i;
}
}
return -1;
}
public int Count
{
get { return (int)GetRedisDb().ListLength(key); }
}
public bool IsReadOnly
{
get { return false; }
}
public bool Remove(T item)
{
return GetRedisDb().ListRemove(key, Serialize(item)) > 0;
}
public void DisplayAll()
{
for (int i = 0; i < Count; i++)
{
Console.WriteLine(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
public IEnumerator<T> GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
for (int i = 0; i < this.Count; i++)
{
yield return Deserialize<T>(GetRedisDb().ListGetByIndex(key, i).ToString());
}
}
}
}