Параллельная обработка с использованием TPL в службе Windows - PullRequest
0 голосов
/ 25 февраля 2019

У меня есть служба Windows, которая использует систему обмена сообщениями для получения сообщений.Я также создал механизм обратного вызова с помощью класса Timer, который помогает мне проверять сообщение через некоторое фиксированное время для извлечения и обработки.Ранее служба обрабатывает сообщение по одному.Но я хочу, чтобы после того, как пришло сообщение, механизм обработки выполнялся параллельно.Таким образом, если первое сообщение пришло, оно должно быть обработано для одной задачи, и даже если обработка первого сообщения еще не завершена после интервала времени, настроенного с использованием метода обратного вызова (обратный вызов работает сейчас), следующее сообщение должно быть выбрано и обработанодругая задача.

Ниже приведен мой код:

Task.Factory.StartNew(() =>
{
    Subsriber<Message> subsriber = new Subsriber<Message>()
    {
       Interval = 1000
    };

    subsriber.Callback(Process, m => m != null);
});

public static void Process(Message message)
{
  if (message != null)
  {
     // Processing logic
  }
 else
 {

 }
}

Но с помощью фабрики задач я не могу контролировать количество задач параллельно, поэтому в моем случае я хочу настроитьколичество задач, по которым сообщения будут запускаться при наличии задач?


Обновление: обновлен мой вышеприведенный код для добавления нескольких задач

Ниже приведен код:

         private static void Main()
        {
            try
            {
                int taskCount = 5;
                Task.Factory.StartNewAsync(() =>
                {
                   Subscriber<Message> consumer = new 
                   Subcriber<Message>()
                   {
                       Interval = 1000
                    };

                   consumer.CallBack(Process, msg => msg!= 
                   null);
                 }, taskCount);
                Console.ReadLine();
              }
             catch (Exception e)
            {
                 Console.WriteLine(e.Message);
            }

            public static void StartNewAsync(this TaskFactory 
            target, Action action, int taskCount)
           {
                 var tasks = new Task[taskCount];
                 for (int i = 0; i < taskCount; i++)
                 {
                      tasks[i] = target.StartNew(action);
                 }
             }

             public static void Process(Message message)
            {
                 if (message != null)
                {

                 }
                else
                { }
             }
        }

Ответы [ 2 ]

0 голосов
/ 27 февраля 2019

Хорошо, извините, у меня мало времени, но вот общая идея / основа того, что я думал в качестве альтернативы.

Если честно, хотя я думаю, что ActionBlock<T> - лучший вариант, поскольку для вас так много сделано, единственное ограничение - вы не можете динамически масштабировать объем работы, который он будет выполнять.это когда-то, хотя я думаю, что предел может быть довольно высоким.Если вы начнете делать это таким образом, у вас будет больше контроля или просто будет запущено какое-то динамическое количество задач, но вам придется много чего делать вручную, например, если вы хотите ограничить количество задач, выполняемых приВремя, вам придется внедрить систему очередей (что-то, что ActionBlock обрабатывает для вас), а затем поддерживать ее.Я думаю, это зависит от того, сколько сообщений вы получаете и как быстро ваш процесс обрабатывает их.

Вам придется проверить это и подумать о том, как это может применяться к вашему прямому варианту использования, так как я думаю, что некоторыеобласти деталей, немного схематично реализованной на моей стороне вокруг идеи параллельного пакета.

Таким образом, идея, которую я создал здесь, заключается в том, что вы можете запускать любое количество задач или добавлять к выполняющимся задачам илиОтмените задачи индивидуально, используя коллекцию.

Главное, я думаю, это просто заставить метод, который Callback запускает, запускать поток, который выполняет работу, вместо подписки в отдельном потоке.

Я использовал Task.Factory.StartNew, как и вы, но сохранил возвращенный объект Task в объекте (TaskInfo), который также имеет свой CancellationTokenSource, его Id (назначенный извне) в качестве свойств, а затем добавил это кколлекция TaskInfo, которая является свойством класса, все это является частью:

Обновлено - чтобы избежать путаницы, я только что обновил код, который был здесь ранее.

Вам придется обновить биты и заполнить пробелы в таких местах, как все, что у вас есть для моего HeartbeatController, и нескольких событиях, которые вызываются, потому что они выходят за рамкивопрос, но идея была бы той же.

    public class TaskContainer
{
    private ConcurrentBag<TaskInfo> Tasks;
    public TaskContainer(){
        Tasks = new ConcurrentBag<TaskInfo>();
    }


//entry point
//UPDATED
public void StartAndMonitor(int processorCount)
{

        for (int i = 0; i <= processorCount; i++)
        {
            Processor task = new Processor(ProcessorId = i);
            CreateProcessorTask(task);
        }
        this.IsRunning = true;
        MonitorTasks();
}

private void CreateProcessorTask(Processor processor)
{
    CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();

    Task taskInstance = Task.Factory.StartNew(
        () => processor.Start(cancellationTokenSource.Token)
    );
    //bind status update event
    processor.ProcessorStatusUpdated += ReportProcessorProcess;

    Tasks.Add(new ProcessorInfo()
    {
        ProcessorId = processor.ProcessorId,
        Task = taskInstance,
        CancellationTokenSource = cancellationTokenSource
    });

}

//this method gets called once but the HeartbeatController gets an action as a param that it then
//executes on a timer. I haven't included that but you get the idea

//This method also checks for tasks that have stopped and restarts them if the manifest call says they should be running.
//Will also start any new tasks included in the manifest and stop any that aren't included in the manifest.
internal void MonitorTasks()
    {
        HeartbeatController.Beat(() =>
        {
            HeartBeatHappened?.Invoke(this, null);
            List<int> tasksToStart = new List<int>();

            //this is an api call or whatever drives your config that says what tasks must be running.
            var newManifest = this.GetManifest(Properties.Settings.Default.ResourceId);

            //task Removed Check - If a Processor is removed from the task pool, cancel it if running and remove it from the Tasks List.
            List<int> instanceIds = new List<int>();
            newManifest.Processors.ForEach(x => instanceIds.Add(x.ProcessorId));
            var removed = Tasks.Select(x => x.ProcessorId).ToList().Except(instanceIds).ToList();

            if (removed.Count() > 0)
            {
                foreach (var extaskId in removed)
                {
                    var task = Tasks.FirstOrDefault(x => x.ProcessorId == extaskId);
                    task.CancellationTokenSource?.Cancel();
                }
            }

            foreach (var newtask in newManifest.Processors)
            {
                var oldtask = Tasks.FirstOrDefault(x => x.ProcessorId == newtask.ProcessorId);
                //Existing task check
                if (oldtask != null && oldtask.Task != null)
                {
                    if (!oldtask.Task.IsCanceled && (oldtask.Task.IsCompleted || oldtask.Task.IsFaulted))
                    {
                        var ex = oldtask.Task.Exception;

                        tasksToStart.Add(oldtask.ProcessorId);
                        continue;
                    }
                }
                else //New task Check                       
                    tasksToStart.Add(newtask.ProcessorId);


            }

            foreach (var item in tasksToStart)
            {
                var taskToRemove = Tasks.FirstOrDefault(x => x.ProcessorId == item);
                if (taskToRemove != null)
                    Tasks.Remove(taskToRemove);

                var task = newManifest.Processors.FirstOrDefault(x => x.ProcessorId == item);
                if (task != null)
                {
                    CreateProcessorTask(task);
                }
            }
        });
    }

}

//UPDATED
public class Processor{

private int ProcessorId;
private Subsriber<Message> subsriber;

public Processor(int processorId) => ProcessorId = processorId;

public void Start(CancellationToken token)
{
    Subsriber<Message> subsriber = new Subsriber<Message>()
    {
        Interval = 1000
    };

    subsriber.Callback(Process, m => m != null);
}

private void Process()
{
    //do work
}
}

Надеюсь, это даст вам представление о том, как еще вы можете подойти к вашей проблеме, и что я не упустил из виду:).


Обновление

Чтобы использовать события для обновления хода выполнения или задач, которые я обрабатываю, я бы извлек их в свой собственный класс, в котором затем есть методы подписки, а при создании нового экземпляра этого класса назначьте событиек обработчику в родительском классе, который затем может обновить ваш пользовательский интерфейс или все, что вы хотите, чтобы он делал с этой информацией.

Таким образом, содержимое Process() будет выглядеть примерно так:

Processor processor = new Processor();
    Task task = Task.Factory.StartNew(() => processor.ProcessMessage(cancellationTokenSource.CancellationToken));

    processor.StatusUpdated += ReportProcess;
0 голосов
/ 25 февраля 2019

Я думаю, что то, что вы ищете, приведет к довольно большой выборке.Я пытаюсь просто продемонстрировать, как бы вы сделали это с ActionBlock<T>.Там еще много неизвестных, поэтому я оставил образец как скелет, который вы можете построить.В этом примере ActionBlock будет обрабатывать и обрабатывать параллельно все ваши сообщения, полученные от вашей системы обмена сообщениями

public class Processor
{
    private readonly IMessagingSystem _messagingSystem;
    private readonly ActionBlock<Message> _handler;
    private bool _pollForMessages;

    public Processor(IMessagingSystem messagingSystem)
    {
        _messagingSystem = messagingSystem;
        _handler = new ActionBlock<Message>(msg => Process(msg), new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = 5 //or any configured value
        });
    }

    public async Task Start()
    {
        _pollForMessages = true;
        while (_pollForMessages)
        {
            var msg = await _messagingSystem.ReceiveMessageAsync();
            await _handler.SendAsync(msg);
        }

    }

    public void Stop()
    {
        _pollForMessages = false;
    }

    private void Process(Message message)
    {
        //handle message
    }
}

Дополнительные примеры

И Идеи

...