Я пытаюсь реализовать шаблон Pipe and Filter, интегрирующий в него поток данных TPL. У меня возникли проблемы, когда не все мои результаты были получены. Например, я вложил в конвейер 99999 предметов, и вышло только 85238.
EmployeeModel.cs
public class EmployeeModel
{
public String FirstName { get; set; }
public String LastName { get; set; }
public String FullName { get; set; }
public override String ToString()
{
return $"FirstName: {FirstName}\nLastName: {LastName}\nFullName: {FullName}\n";
}
}
IFilter.cs
public interface IFilter<T>
{
T Execute(T input);
}
AbstractParallelFilter.cs
public abstract class AbstractParallelFilter<T> : IFilter<T>
{
public AbstractParallelFilter()
{
TransformBlock = new TransformBlock<T, T>(new Func<T, T>(Execute), new ExecutionDataflowBlockOptions()
{
BoundedCapacity = DataflowBlockOptions.Unbounded,
MaxDegreeOfParallelism = Environment.ProcessorCount
});
}
public abstract T Execute(T input);
internal TransformBlock<T, T> TransformBlock { get; private set; }
}
IParallelPipeline.cs
public interface IParallelPipeline<T>
{
IParallelPipeline<T> Register(AbstractParallelFilter<T> filter);
IParallelPipeline<T> CompleteRegisteration();
IParallelPipeline<T> Process(T input);
Task CompleteProcessing();
ConcurrentBag<T> Results { get; set; }
}
AbstractParallelPipeline.cs
public abstract class AbstractParallelPipeline<T>: IParallelPipeline<T>
{
public AbstractParallelPipeline()
{
filters = new List<AbstractParallelFilter<T>>();
Results = new ConcurrentBag<T>();
}
public IParallelPipeline<T> Register(AbstractParallelFilter<T> filter)
{
filters.Add(filter);
return this;
}
public abstract IParallelPipeline<T> Process(T input);
public Task CompleteProcessing()
{
if (filters.Count == 0)
throw new Exception("No filters have been registered");
filters.First().TransformBlock.Complete();
return filters.Last().TransformBlock.Completion;
}
public IParallelPipeline<T> CompleteRegisteration()
{
if (filters.Count < 2)
{
return this;
}
else
{
for (int i = filters.Count - 2; i >= 0; i--)
{
filters[i].TransformBlock.LinkTo(filters[i + 1].TransformBlock, new DataflowLinkOptions() { PropagateCompletion = true });
}
ActionBlock<T> dumpBlock = new ActionBlock<T>(x => Results.Add(x));
filters.Last().TransformBlock.LinkTo(dumpBlock, new DataflowLinkOptions() { PropagateCompletion = true });
}
return this;
}
public IList<AbstractParallelFilter<T>> filters;
public ConcurrentBag<T> Results { get; set; }
}
ParallelPipeline.cs
public class ParallelPipeline<T> : AbstractParallelPipeline<T>
{
public override IParallelPipeline<T> Process(T input)
{
filters.First().TransformBlock.Post(input);
return this;
}
}
Program.cs
class Program
{
static void Main(string[] args)
{
List<EmployeeModel> employeeModels = new List<EmployeeModel>();
int count = 99999;
for (int i = 0; i < count; i++)
{
EmployeeModel employee = new EmployeeModel()
{
FirstName = NameGenerator.GenerateFirstName(Gender.Female),
LastName = NameGenerator.GenerateLastName()
};
employeeModels.Add(employee);
}
IParallelPipeline<EmployeeModel> parallelPipeline = new ParallelPipeline<EmployeeModel>()
.Register(new ParallelFirstNameToUpperFilter())
.Register(new ParallelLastNameToUpperFilter())
.Register(new ParallelFullNameConcatFilter())
.CompleteRegisteration();
for (int i = 0; i < count; i++)
{
parallelPipeline.Process(employeeModels[i]);
}
parallelPipeline
.CompleteProcessing()
.Wait();
Console.WriteLine(parallelPipeline.Results.Count);
Console.Read();
}
}