Параллельный конвейер в C # - PullRequest
3 голосов
/ 06 января 2011

Я хочу создать параллельный конвейер в C #.Я объявил интерфейс с именем IOperation:

public interface IOperation<Tin, Tout>
{
    BlockingCollection<Tout> BlockingCollection(IEnumerable<Tin> input);
}

Теперь я хочу написать класс, который выполняет несколько из этих операций параллельно.Я с этим согласен:

public class Pipeline : IPipeline
{
    private List<IOperation<Object, Object>> operations = new List<IOperation<Object, Object>>();
    private List<BlockingCollection<Object>> buffers = new List<BlockingCollection<Object>>();
    public void Register(IOperation<Object, Object> operation)
    {
        operations.Add(operation);
    }

    public void Execute()
    {

    }
}

Но я не нахожу никакого решения для сохранения операций и буферов между операциями, потому что все они имеют разные универсальные типы.У кого-нибудь есть идея?

Ответы [ 4 ]

1 голос
/ 11 ноября 2013

У Microsoft есть нечто подобное: Поток данных TPL позволяет определять блоки в конвейере с детальными элементами управления их буферизацией и распараллеливанием.

В отличие от вашего решения, оно использует полностью асинхронный push-дизайн. Он не использует BlockingCollection (конструкция с блокирующим движением) и будет значительно быстрее, если у вас глубокий конвейер.

1 голос
/ 06 января 2011

Не очень понятно, как должен работать ваш конвейер. Почему вы проходите мимо BlockingCollections? Почему вы используете дженерики, а затем вводите object в качестве типа?

Вместо этого рассмотрите вариант использования конвейера, который вы загружаете с делегатами типа Action, а затем используете параллельную библиотеку задач для создания задач, которые выполняют эти действия параллельно.

public void Register(Action operation)
    {
        operations.Add(operation);
    }

public void Execute()
    {
        foreach (var action in operations)
          Task.StartNew(operation);
    }

Но на самом деле это не «конвейер», это просто набор операций, которые выполняются параллельно.

У конвейера обычно бывают шаги конвейера с типом ввода и типом вывода. Вы могли бы справиться с этим, создав что-то вроде PipelineStep<T,U>, и вы бы сконструировали каждый шаг конвейера, передавая операцию Func. Внутри каждый шаг конвейера может потреблять входной IEnumerable и создавать выходной IEnumerable, и он может делать это с помощью Task или, проще, с помощью параллельного цикла foreach.

В качестве альтернативы вы можете использовать метод TPL Task.ContinueWith для объединения задач в цепочку от входа к выходу.

1 голос
/ 06 января 2011

Рассматривали ли вы использование Parallel.ForEach из TPL?
Библиотека параллельных задач (TPL) - это набор открытых типов и API в .NET 4.

0 голосов
/ 11 ноября 2013

На http://msdn.microsoft.com/en-us/library/ff963548.aspx есть хорошая статья о параллельных конвейерах с BlockingCollection.

Обычно каждый шаг должен иметь выходную очередь типа BlockingCollection. Он принимает элементы из очереди вывода предыдущего шага и добавляет их в свой вывод после завершения обработки.

...