Не очень понятно, как должен работать ваш конвейер. Почему вы проходите мимо BlockingCollections? Почему вы используете дженерики, а затем вводите object
в качестве типа?
Вместо этого рассмотрите вариант использования конвейера, который вы загружаете с делегатами типа Action
, а затем используете параллельную библиотеку задач для создания задач, которые выполняют эти действия параллельно.
public void Register(Action operation)
{
operations.Add(operation);
}
public void Execute()
{
foreach (var action in operations)
Task.StartNew(operation);
}
Но на самом деле это не «конвейер», это просто набор операций, которые выполняются параллельно.
У конвейера обычно бывают шаги конвейера с типом ввода и типом вывода. Вы могли бы справиться с этим, создав что-то вроде PipelineStep<T,U>
, и вы бы сконструировали каждый шаг конвейера, передавая операцию Func. Внутри каждый шаг конвейера может потреблять входной IEnumerable и создавать выходной IEnumerable, и он может делать это с помощью Task или, проще, с помощью параллельного цикла foreach.
В качестве альтернативы вы можете использовать метод TPL Task.ContinueWith
для объединения задач в цепочку от входа к выходу.