Какой тип IProducerConsumerCollection <T>использовать для моей задачи? - PullRequest
3 голосов
/ 30 апреля 2011

У меня ровно 100 датчиков, каждый из которых "измеряет" свои данные.У меня есть ровно один DataSender , который должен отправлять информацию с «датчиков».Самая последняя информация должна быть отправлена.

Пропускная способность канала может быть меньше, чем данные, полученные от 100 датчиков.В этом случае некоторые данные могут быть пропущены, но мы должны быть «примерно честными».Например, мы можем пропустить каждое второе измерение для каждого датчика.

Я не знаю, как часто каждый датчик генерирует данные, но в целом они генерируют данные довольно часто.

После моих других сообщений:

Я решил, что у меня классическая проблема Производитель / Потребитель , с:

  • 100 Производителями и
  • 1 Потребителем

Мне предложили использовать для этого BlockingCollection.Единственная проблема с BlockingCollection - после добавления элемента вы не можете заменить его.Но в моем приложении, если датчик выдает новое значение, а предыдущее значение не было обработано Consumer, значение должно быть заменено .

Если я использую, используйте ConcurentDictionary или ConcurentBag для этой задачи?

Концептуально, все, что мне нужно - это массив из 100 элементов.

Датчик # 33 должен заменить его значение в массиве [33]:

| Sensor | Value |
|--------|-------|
|      1 |       |
|      2 |       |
|      3 |       |
/......../......./
|     32 |       |
|     33 | 101.9 |
|     34 |       |
/......../......./
|     98 |       |
|     99 |       |
|    100 |       |

Consumer должен принять значение от array[33] и, если не ноль, отправить его и установить массив [33] до нуля.Consumer должен реагировать на любые ненулевые значения в массиве как можно скорее.

Ответы [ 2 ]

4 голосов
/ 30 апреля 2011

Я думаю, вы должны реализовать свой IProducerConsumerCollection<T>.Вот почему это интерфейс: так что вы можете легко создать свой собственный.

Вы можете сделать это, используя Dictionary<K,V> и Queue<T>, чтобы убедиться, что получение данных справедливо, т.е. если у вас есть только одно устройство, котороегенерирует данные очень быстро, вы не будете отправлять данные только с этого.

public class DeviceDataQueue<TDevice, TData>
    : IProducerConsumerCollection<Tuple<TDevice, TData>>
{
    private readonly object m_lockObject = new object();
    private readonly Dictionary<TDevice, TData> m_data
        = new Dictionary<TDevice, TData>();
    private readonly Queue<TDevice> m_queue = new Queue<TDevice>();

    //some obviously implemented methods elided, just make sure they are thread-safe

    public int Count { get { return m_queue.Count; } }

    public object SyncRoot { get { return m_lockObject; } }

    public bool IsSynchronized { get { return true; } }

    public bool TryAdd(Tuple<TDevice, TData> item)
    {
        var device = item.Item1;
        var data = item.Item2;

        lock (m_lockObject)
        {
            if (!m_data.ContainsKey(device))
                m_queue.Enqueue(device);

            m_data[device] = data;
        }

        return true;
    }

    public bool TryTake(out Tuple<TDevice, TData> item)
    {
        lock (m_lockObject)
        {
            if (m_queue.Count == 0)
            {
                item = null;
                return false;
            }

            var device = m_queue.Dequeue();
            var data = m_data[device];
            m_data.Remove(device);
            item = Tuple.Create(device, data);
            return true;
        }
    }
}

При использовании по следующим линиям:

Queue = new BlockingCollection<Tuple<IDevice, Data>>(
    new DeviceDataQueue<IDevice, Data>());

Device1 = new Device(1, TimeSpan.FromSeconds(3), Queue);
Device2 = new Device(2, TimeSpan.FromSeconds(5), Queue);

while (true)
{
    var tuple = Queue.Take();
    var device = tuple.Item1;
    var data = tuple.Item2;

    Console.WriteLine("{0}: Device {1} produced data at {2}.",
        DateTime.Now, device.Id, data.Created);

    Thread.Sleep(TimeSpan.FromSeconds(2));
}

выдает следующий вывод:

30.4.2011 20:40:43: Device 1 produced data at 30.4.2011 20:40:43.
30.4.2011 20:40:45: Device 2 produced data at 30.4.2011 20:40:44.
30.4.2011 20:40:47: Device 1 produced data at 30.4.2011 20:40:47.
30.4.2011 20:40:49: Device 2 produced data at 30.4.2011 20:40:49.
30.4.2011 20:40:51: Device 1 produced data at 30.4.2011 20:40:51.
30.4.2011 20:40:54: Device 2 produced data at 30.4.2011 20:40:54.
1 голос
/ 30 апреля 2011

Вместо использования другой структуры данных, сделайте еще один трюк. Элемент в вашей коллекции не может быть заменен, но вы можете вместо сохранения фактического значения сохранить мини-контейнер. Когда вы хотите заменить, вы фактически заменяете значение в контейнере, а не заменяете контейнер.

class ElementFromQueue
{
     public object SensorData;
}

...

ElementFromQueue elem = new ElementFromQueue();
elem.SensorData = new object();
...
queue.Add(elem); //Element is in queue now
...
elem.SensorData = new object(); //Update the data, simulating replace

Или просто создайте очередь индексов, которые будут указывать на номер датчика. При извлечении значения последнее значение датчика запрашивается из другого обновляемого набора

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...