Обеспечение порядка выполнения задач с использованием параллельных библиотек задач .NET 4.0 - PullRequest
3 голосов
/ 29 октября 2010

У меня есть программа, которая имеет тонну датчиков, генерирующих данные с довольно высокой скоростью, и потребителей, которые должны их использовать.Потребители потребляют по-разному.

Поскольку я использую IObserver / IObservable, тривиальным решением было просто создать задачу для каждого события и обернуть вызов OnNext () и данные в лямду.Это сработало очень хорошо, я был удивлен, насколько мало было накладных расходов на необработанные вызовы.

Проблема в том, что некоторым из этих потребителей нужен строго соблюдаемый порядок событий, и они не могут пропустить ни одного события.«PerferFairness» не достаточно хорош.

Лучшее решение, которое я придумаю, - это вместо того, чтобы обернуть пару событие / OnNext (), чтобы обернуть вставку в ParallelQueue, по одной очереди на потребителяпоток на другом конце очереди для выполнения вызовов OnNext ().

Три немедленные проблемы с этим подходом.Это намного, намного медленнее, чем решение для обёртывания Task / OnNext ().Для ParallelQueue нет блокировки блокировки (или есть?), Поэтому реализация немного сложнее.В-третьих, это кажется настолько распространенной проблемой, что я не могу себе представить, что нет какого-то способа обеспечения порядка, который я пропустил, возможно, что-то вроде нескольких фабрик задач, совместно использующих базовый пул, каждая фабрика с некоторыми настройками, которые заставляют их строго применятьorder.

Кто-нибудь знает правильный способ достижения того, что я пытаюсь сделать?

РЕДАКТИРОВАТЬ: Любое решение, которое включает в себя поток для каждого потребителя или производителя не работает.Производители / Потребители образуют длинные цепочки, каждая из которых насчитывает сотни.

Ответы [ 2 ]

4 голосов
/ 07 ноября 2011

Библиотека TPL DataFlow может хорошо подойти для вашего приложения. Он расширяет TPL с помощью парадигмы потока данных, которая позволяет настраивать граф обработки и работать в высокопроизводительной среде выполнения.

Поток данных TPL доступен в виде библиотеки поверх .NET 4 и должен поставляться как часть .NET 4.5.

1 голос
/ 29 октября 2010

Не комментируйте лучшую абстракцию для принудительного частичного упорядочения, но если вы используете оболочку BlockingCollection<T> вокруг ConcurrentQueue<T>, это даст вам блокирующую операцию Take для удаления элементов. e.g.:

// the default is ConcurrentQueue, so you don't have to specify, but if you
// wanted different behavior you could use e.g. ConcurrentStack

var coll = new BlockingCollection<int>(new ConcurrentQueue<int>());

coll.Add(5); // blocks if the collection is at max capacity

int five = coll.Take(); // blocks if the collection is empty
...