Я попытался реализовать простое моделирование алгоритма организации очередей Weighted Fair.
Но я заметил проблему при использовании только 2 потоков пакетов, алгоритм не позволяет вдвое увеличить количество пакетов T1, как это должно быть, потому что они составляют половину размера пакета T2. Вместо этого он просто делает циклический перебор между двумя потоками.
Я понимаю, что проблема в том, что T1 требуется некоторое время для добавления другого пакета в очередь, а в очереди Weighted Fair есть время для запуска, когда очередь T1 пуста.
Это не похоже на ожидаемое поведение, я что-то упустил в алгоритме, который исправит этот угловой случай?
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
public class Packet
{
public string name;
public ManualResetEventSlim mres;
public long virFinish;
public long packetSize;
public Packet(string pname, long p_virFinish, long p_packetSize)
{
name = pname;
mres = new ManualResetEventSlim(false);
virFinish = p_virFinish;
packetSize = p_packetSize;
}
}
public class Queue
{
public ConcurrentQueue<Packet> q = new ConcurrentQueue<Packet>();
public long virStart;
public long lastVirFinish;
}
public class FairQueue
{
long VirtualTime; // system virtual time
ConcurrentDictionary<string, Queue> queues = new ConcurrentDictionary<string, Queue>();
public FairQueue() {
VirtualTime = long.MinValue;
queues.GetOrAdd("t1", new Queue());
queues.GetOrAdd("t2", new Queue());
queues.GetOrAdd("t3", new Queue());
}
public ManualResetEventSlim Write(string queueName, long packetSize)
{
var queue = queues[queueName];
Volatile.Write(ref queue.virStart, Math.Max(VirtualTime, queue.lastVirFinish));
var packetFinish = queue.virStart + packetSize;
var packet = new Packet(queueName, packetFinish, packetSize);
queue.q.Enqueue(packet);
Volatile.Write(ref queue.lastVirFinish, packetFinish);
UpdateVirtualClock(queue);
return packet.mres;
}
private void UpdateVirtualClock(Queue queue)
{
long minStart = queue.virStart;
foreach (var item in queues)
{
if (!item.Value.q.IsEmpty)
{
minStart = Math.Min(item.Value.virStart, minStart);
}
}
Volatile.Write(ref VirtualTime, Math.Max(VirtualTime, minStart));
}
// return queue with smallest lastVirFinish
public Queue SelectQueue()
{
long minVirFinish = long.MaxValue; // infinity
Queue selected = null;
foreach (var queue in queues)
{
if (!queue.Value.q.IsEmpty)
{
var lastVirFinish = Volatile.Read(ref queue.Value.lastVirFinish);
if (lastVirFinish < minVirFinish) {
minVirFinish = queue.Value.lastVirFinish;
selected = queue.Value;
}
}
}
return selected;
}
public Packet Send()
{
var selectedQueue = SelectQueue();
if (selectedQueue != null)
{
Packet p;
selectedQueue.q.TryDequeue(out p);
/* Set the start and the finish times of the remaining packets in the queue */
if (!selectedQueue.q.IsEmpty)
{
var next = selectedQueue.q.Take(1).First();
selectedQueue.virStart = selectedQueue.lastVirFinish;
selectedQueue.lastVirFinish = selectedQueue.virStart + next.packetSize;
}
return p;
}
else
{
return null;
}
}
}
class Program
{
static void Main(string[] args)
{
FairQueue fq = new FairQueue();
// Task that enqueue packet
var t1 = Task.Factory.StartNew(
() => {
while (true)
{
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
var s1 = fq.Write("t1", 100);
s1.Wait();
sw.Stop();
}
}
);
// Task that enqueue packet
var t2 = Task.Factory.StartNew(
() => {
while (true)
{
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
sw.Start();
var s1 = fq.Write("t2", 200);
s1.Wait();
sw.Stop();
}
}
);
// Loop that run the Weighted Fair Queue Algorithm
while (true)
{
var toSend = fq.Send();
if (toSend != null)
{
Console.WriteLine(toSend.name);
Thread.Sleep((int)toSend.packetSize);
toSend.mres.Set();
}
}
}
}
}