Реализация поточно-ориентированной параллельной обработки - PullRequest
0 голосов
/ 05 января 2012

Я пытаюсь преобразовать существующий процесс таким образом, чтобы он поддерживал многопоточность и параллелизм, чтобы сделать решение более надежным и надежным.

Возьмите пример системы аварийного оповещения.Когда рабочий вступает в действие, создается новый объект получателя с его информацией и добавляется в коллекцию получателей.И наоборот, когда они отключаются, объект удаляется.И в фоновом режиме, когда происходит предупреждение, механизм оповещения будет перебирать один и тот же список получателей (foreach), вызывая SendAlert (...) для каждого объекта.

Вот некоторые из моих требований:

  • Добавление получателя не должно блокироваться, если выполняется оповещение.
  • Удаление получателя не должно блокироваться, если оповещение выполняется.
  • Добавление или удалениеполучатель не должен влиять на список получателей, используемых текущим оповещением.

Я просматривал классы Task и Parallel, а также классы BlockingCollection и ConcurrentQueue, но не ясно, чтолучший подход.

Это так же просто, как использование BlockingCollection?Прочитав тонну документации, я все еще не уверен, что произойдет, если Add будет вызван во время перечисления коллекции.

ОБНОВЛЕНИЕ

Коллега отослала меня кследующая статья, в которой описывается класс ConcurrentBag и поведение каждой операции:

http://www.codethinked.com/net-40-and-system_collections_concurrent_concurrentbag

Исходя из объяснения автора, представляется, что эта коллекция (почти) будет служить моим целям.Я могу сделать следующее:

  1. Создать новую коллекцию

    var получателей = новый ConcurrentBag ();

  2. Когдарабочий-час, создайте нового получателя и добавьте его в коллекцию:

    receients.Add (new Recipient ());

  3. При возникновении предупрежденияМеханизм оповещений может выполнять итерацию по коллекции в это время, потому что GetEnumerator использует снимок элементов коллекции.

    foreach (var получатель в получателях) получатель.*

    Когда работник закрывается, удалите получателя из коллекции:

    ???

ConcurrentBag не предоставляет способ удаления определенноговещь.Насколько я могу судить, ни один из параллельных классов не делает.Я что-то пропустил?Помимо этого, ConcurrentBag делает все, что мне нужно.

Ответы [ 4 ]

1 голос
/ 06 января 2012

ConcurrentBag<T> определенно должен быть лучшим классом из всех, которые вы можете использовать в таком случае. Перечисление работает в точности так, как описывает ваш друг, и поэтому оно должно хорошо послужить сценарию, который вы изложили. Однако, зная, что вы должны удалить определенные элементы из этого набора, единственный тип, который будет работать для вас, это ConcurrentDictionary<K, V>. Все остальные типы предлагают только метод TryTake, который в случае ConcurrentBag<T> является неопределенным или, в случае ConcurrentQueue<T> или ConcurrentStack<T> только для заказа.

Для вещания вы просто должны сделать:

ConcurrentDictionary<string, Recipient> myConcurrentDictionary = ...;

...

foreach(Recipient recipient in myConcurrentDictionary.Values)
{
    ...
}

Перечислитель снова является снимком словаря в этот момент.

1 голос
/ 06 января 2012

Я пришел на работу сегодня утром по электронной почте от друга, который дает мне два следующих ответа:

1 - Что касается работы коллекций в пространстве имен Concurrent, большинство из них предназначены для добавления и вычитания из коллекции без блокировки и являются поточно-ориентированными даже в процессе перечисления элементов коллекции.

В «обычной» коллекции получение перечислителя (через GetEnumerator) устанавливает значение «version», которое изменяется любой операцией, которая влияет на элементы коллекции (например, Add, Remove или Clear). Реализация IEnumerator будет сравнивать набор версий, когда он был создан, с текущей версией коллекции. Если отличается, генерируется исключение и перечисление прекращается.

Коллекции Concurrent разработаны с использованием сегментов, которые позволяют очень легко поддерживать многопоточность. Но в случае перечисления они фактически создают копию снимка коллекции во время вызова GetEnumerator, и перечислитель работает с этой копией. Это позволяет вносить изменения в коллекцию без негативных последствий для счетчика. Конечно, это означает, что перечисление ничего не будет знать об этих изменениях, но, похоже, ваш вариант использования позволяет это.

2 - Что касается конкретного сценария, который вы описываете, я не считаю, что необходим параллельный сбор. Вы можете обернуть стандартную коллекцию с помощью ReaderWriterLock и применять ту же логику, что и параллельные коллекции, когда вам нужно перечислить.

Вот что я предлагаю:

public class RecipientCollection
{
    private Collection<Recipient> _recipients = new Collection<Recipient>();
    private ReaderWriterLock _lock = new ReaderWriterLock();

    public void Add(Recipient r)
    {
        _lock.AcquireWriterLock(Timeout.Infinite);

        try
        {
            _recipients.Add(r);
        }
        finally
        {
            _lock.ReleaseWriterLock();
        }
    }

    public void Remove(Recipient r)
    {
        _lock.AcquireWriterLock(Timeout.Infinite);

        try
        {
            _recipients.Remove(r);
        }
        finally
        {
            _lock.ReleaseWriterLock();
        }
    }

    public IEnumerable<Recipient> ToEnumerable()
    {
        _lock.AcquireReaderLock(Timeout.Infinite);

        try
        {
            var list = _recipients.ToArray();

            return list;
        }
        finally
        {
            _lock.ReleaseReaderLock();
        }
    }
}

ReaderWriterLock гарантирует, что операции блокируются, только если выполняется другая операция, которая изменяет содержимое коллекции. Как только эта операция завершается, блокировка снимается, и следующая операция может продолжаться.

Ваш механизм предупреждений будет использовать метод ToEnumerable (), чтобы получить копию снимка коллекции в то время и перечислить копию.

В зависимости от того, как часто отправляется оповещение и вносятся изменения в коллекцию, это может быть проблемой, но вы все равно сможете реализовать некоторый тип свойства версии, который изменяется при добавлении или удалении элемента и оповещении Движок может проверить это свойство, чтобы узнать, нужно ли ему снова вызывать ToEnumerable () для получения последней версии. Или инкапсулируйте это, кэшируя массив внутри класса RecipientCollection и аннулируя кеш при добавлении или удалении элемента.

НТН

0 голосов
/ 06 января 2012

Ваши требования мне подходят как способ запуска стандартных событий .NET в C #.Я не знаю, если синтаксис VB компилируется в подобный код или нет.Стандартный шаблон выглядит примерно так:

public event EventHandler Triggered;
protected void OnTriggered()
{
    //capture the list so that you don't see changes while the
    //event is being dispatched.
    EventHandler h = Triggered;
    if (h != null)
        h(this, EventArgs.Empty);
}

В качестве альтернативы вы можете использовать неизменяемый класс списка для хранения получателей.Затем при отправке оповещения он сначала берет текущий список и использует его в качестве «снимка», который нельзя изменить путем добавления и удаления во время отправки оповещения.Например:

class Alerter
{
    private ImmutableList<Recipient> recipients;

    public void Add(Recipient recipient)
    {
        recipients = recipients.Add(recipient);
    }

    public void Remove(Recipient recipient)
    {
        recipients = recipients.Remove(recipient);
    }

    public void SendAlert()
    {
        //make a local reference to the current list so
        //you are not affected by any calls to Add/Remove
        var current = recipients;
        foreach (var r in current)
        {
            //send alert to r
        }
    }
}

Вам нужно будет найти реализацию ImmutableList, но вы сможете найти несколько без особой работы.В методе SendAlert, как я его написал, мне, вероятно, не нужно было явно указывать локальный код, чтобы избежать проблем, поскольку цикл foreach сделал бы это сам, но я думаю, что копия проясняет намерение.

0 голосов
/ 06 января 2012

В реализации, подобной этой, гораздо больше, чем просто аспекты параллельной обработки, при этом долговечность, вероятно, является главной среди них. Рассматривали ли вы создание этого с использованием существующей технологии PubSub, например, скажем ... Темы Azure или NServiceBus ?

...