Как задержать несколько сообщений параллельно? - PullRequest
0 голосов
/ 26 апреля 2018

Я занимаюсь программированием TCP и хочу смоделировать задержку.

Каждое «сообщение» (байт [], представляющий сериализованный объект) должно быть задержано на некоторое время t. Я думал, что у меня может быть одна функция для сбора необработанных сообщений:

private Queue<byte[]> rawMessages = new Queue<byte[]>();
private void OnDataReceived(object sender, byte[] data)
{
    rawMessages.Enqueue(data);
}

Другой метод с циклом while для непрерывного чтения из rawMessages и последующей задержки каждого из них:

private Queue<byte[]> delayedRawMessages = new Queue<byte[]>();
delayMessagesInTask = Task.Factory.StartNew(() =>
{
    while (true) //TODO: Swap a cancellation token
    {
        if(rawMessages.Count > 0){
            var rawMessage = rawMessages.Dequeue();
            //???
            //Once the message has been delayed, enqueue it into another buffer.
            delayedRawMessages.Enqueue(rawMessage);
        }
    }
});

Я подумал, что для задержки каждого сообщения у меня может быть другой метод для порождения потока, который использует Thread.Sleep(t) для ожидания времени t и затем ставит в очередь delayedRawMessages. Я уверен, что это сработает, но я думаю, что должен быть лучший способ. Проблема с подходом Thread.Sleep заключается в том, что сообщение 2 может закончиться с задержкой до сообщения 1 ... Мне, очевидно, нужно, чтобы сообщения были задержаны и выполнены в правильном порядке , иначе я бы не использовал TCP.

Я ищу способ сделать это, чтобы задержать на максимально возможное время t и не повлиять на остальную часть приложения, замедляя его.

Кто-нибудь здесь знает, как лучше это сделать?

1 Ответ

0 голосов
/ 26 апреля 2018

Я решил пойти с подходом «производитель / множественный потребитель».

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

public class LatencySimulator {

    public enum SimulatorType { UP, DOWN };

    public SimulatorType type;
    public int latency = 0;
    public int maxConsumers = 50;
    public BlockingCollection<byte[]> inputQueue = new BlockingCollection<byte[]>(new ConcurrentQueue<byte[]>());
    public Queue<byte[]> delayedMessagesQueue = new Queue<byte[]>();

    void CreateConsumers()
    {
        for (int i = 0; i < maxConsumers; i++)
        {
            Task.Factory.StartNew(() => Consumer(),TaskCreationOptions.LongRunning);
        }
    }

    private void Consumer()
    {
        foreach (var item in inputQueue.GetConsumingEnumerable())
        {
            Thread.Sleep(latency);  
            delayedMessagesQueue.Enqueue(item);
        }
    }
}

Чтобы использовать его, создайте новый LatencySimulator, установите его «тип», максимальное количество потребителей и время задержки для имитации. Позвоните CreateConsmers(), а затем заполните inputQueue. Когда сообщения задерживаются, они появляются в delayedMessagesQueue.

Я действительно не знаю, является ли это идеальным способом достижения моей цели, но это работает ... пока.

...