.NET Как сообщить (коллективный) прогресс о параллельных рабочих нагрузках, где каждый объект / задача сообщает о своем собственном прогрессе? - PullRequest
0 голосов
/ 03 октября 2018

Я работаю над параллельными рабочими нагрузками, где каждый объект или задача сообщает о своем собственном прогрессе, и я хочу сообщить о коллективном прогрессе задачи в целом.

Например, представьте, что у меня есть 10 рабочих объектовкоторые все сообщают об индивидуальном прогрессе.Они содержат 0-100 «заданий», которые должны быть выполнены.

Если бы мы должны были линейно выполнять итерации по каждому из объектов Work, мы могли бы легко сообщить о нашем прогрессе и увидеть что-то вроде этого:

Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #1 of 10 is currently 2 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.

Однако при параллельной работе вывод будет выглядеть примерно так:

Work item #1 of 10 is currently 1 of 100 tasks completed.
Work item #4 of 10 is currently 16 of 100 tasks completed.
Work item #7 of 10 is currently 4 of 100 tasks completed.
...
Work item #10 of 10 is currently 100 of 100 tasks completed.

Проблема, которую я пытаюсь решить, заключается в объединении всего процесса в параллельных циклах, так что вывод напользователь больше похож на «1/1000» или «10/1000», представляющий общий объем выполненной работы, и обновляющий числитель по мере продолжения работы.

Я ожидаю, что есть решение или шаблон, который подходит независимо от того,Async / Await или использование асинхронного шаблона задачи - я использую оба - и я надеюсь, что уже есть способы справиться с этим в .NET Framework, который я не обнаружил.

Использование этого простого (pseudocode) пример из TAP:

Parallel.ForEach(WorkObject, wo =>
{
    // Perhaps each WorkObject has a "ProgressChanged" delegate that fires progress notifications.
    wo.ProgressChanged += delegate (int currentProgress, int totalProgress)
    {
        ReportProgress($"Work item #{wo.ID} of {WorkObject.Count} is currently {currentProgress} of {totalProgress} tasks completed.
    };

    // Or perhaps using IProgress<T> or Progress?
    // wo.PerformWork(/*IProgress<T> or Progress<T>, etc.*/);
});

Мы можем выполнять параллельные итерации, и обновления / уведомления о прогрессе будут поступать по завершении каждого потокаединица работы.

Как мы можем эффективно объединить прогресс всех WorkObjects так, чтобы мы могли сообщать о более равномерном "1/1000" выполненном?

Проблема в том, что каждый WorkObjectможет иметь разное количество «заданий» для выполнения, и у нас может быть разное количество WorkObjects, которые должны работать.Если просто объединить числитель и знаменатель из всех рабочих объектов при поступлении каждого уведомления о ходе выполнения (при условии, что они обновляются после завершения каждой единицы работы), к концу параллельной рабочей нагрузки уведомление о ходе выполнения будет отображать что-то вроде «1000/100 000».вместо «1000/1000».

Кажется, что нам нужен способ отслеживать текущий прогресс X, а также общий прогресс Y, чтобы сформировать связное сообщение для пользователя о состоянии общего прогресса(X of Y завершено.)

Существует ли существующая модель (в платформе или иным образом) для этого?

Моя текущая мысль - создать структуру данных, записывающую идентификатор потока каждогопоток выполняется параллельно, а затем отслеживает прогресс каждого потока в этом значении структуры данных (в виде X / Y) и, наконец, каждый поток публикует обновление прогресса, перебирая структуру данных для суммирования X / Y из каждого потока, чтобы сгенерироватьВсего "X / Y" для отображения пользователю.

Но, безусловно, эта проблемас разработчиками сталкиваются каждый день - значит, должен быть другой путь?

Ответы [ 2 ]

0 голосов
/ 01 июля 2019

Я закончил тем, что создал класс для управления продвижением по потокам;вот что я придумал:

// A Parallel Progress Manager is designed to collect progress information from multiple sources and provide a total sum of progress.
// For example, if 3 objects are going to perform some work in parallel, and the first object has 10 tasks, the second has 100, and the last has 1000,
// when executing in parallel, it isn't useful to have each task fire a ProgressChanged() event (or something similar), as it would result in the progress
// being returned something like 0/10, 1/10, 2/10, 0/100, 3/10, 1/100, 0/1000, etc. (As each thread executes independently.)
//
// Instead, this class aggregates progress and provides a total sum of progress: 0/1110, 1/1110, etc.
//
// NOTE: The intention of this class is to manage parallelized workloads across numerous jobs. For example, operating in parallel against 3 different objects
// that all report progress independently, such as Paralle.ForEach(IEnumerable<T>). This is not suggested for parallelized workloads of a single job, such as
// Parallel.For(i, 100)—in this case, it is recommended to update progress using Interlocked.Increment() or a lock() on a synchronization object as one would normally.

// Example:
//
// ParallelProgressManager ppm = new ParallelProgressManager();
//
// Parallel.ForEach(IEnumerable<T>, t =>
// {
//      t.ProgressChanged += delegate (long current, long total, bool indeterminate, string message)
//      {
//          lock(ppm)
//          {
//              var x = ppm.SetGetProgress(t.GetHashCode(), current, total);
//
//              ReportProgress(x.Item1, x.Item2, false, $"Working... {x.Item1} / {x.Item2}");
//          }
//      }
// });

using System;
using System.Collections.Generic;

namespace Threading
{
    /// <summary>
    /// A Parallel Progress Manager used to aggregate and sum progress across multiple objects working in parallel.
    /// </summary>
    public class ParallelProgressManager
    {
        /// <summary>
        /// The progress class contains current and total progress and
        /// </summary>
        protected class Progress
        {
            public long Current { get; set; } = 0;
            public long Total { get; set; } = 0;
        }

        /// <summary>
        /// The ProgressDictionary associates each working object's Hash Code with it's current progress (via a Progress object.)
        /// This way an object can operate in parallel and as progress updates come in, the last update is replaced by the new one.
        /// We can then sum the "current" and "total" to produce an overall progress value.
        /// </summary>
        private Dictionary<int, Progress> ProgressDictionary { get; set; } = new Dictionary<int, Progress>();

        /// <summary>
        /// Sets an object's progress via it's Hash Code. If the object isn't recognized, a new entry will be made for it. If it is recognized,
        /// it's progress will be updated accordingly.
        /// </summary>
        /// <param name="hashCode">
        /// The Hash Code of the object (.GetHashCode()) that's reporting progress. The Hash Code is used to distinguish the objects to manage progress of.
        /// </param>
        /// <param name="current">
        /// The current progress.
        /// </param>
        /// <param name="total">
        /// The total progress.
        /// </param>
        public void SetProgress(int hashCode, long current, long total)
        {
            if (!ProgressDictionary.ContainsKey(hashCode))
                ProgressDictionary.Add(hashCode, new Progress() { Current = current, Total = total });
            else
            {
                ProgressDictionary[hashCode].Current = current;
                ProgressDictionary[hashCode].Total = total;
            }
        }

        /// <summary>
        /// Retrieves the total progress of all objects currently being managed.
        /// </summary>
        /// <returns>
        /// A Tuple where the first value represents the summed current progress, and the second value represents the summed total progress.
        /// </returns>
        public Tuple<long, long> GetProgress()
        {
            long c = 0;
            long t = 0;

            foreach (var p in ProgressDictionary)
            {
                c += p.Value.Current;
                t += p.Value.Total;
            }

            return Tuple.Create(c, t);
        }

        /// <summary>
        /// Sets progress for the provided object and retrieves an updated total progress. This is equivalent to calling SetProgress() and then calling
        /// GetProgress() immediately after.
        /// </summary>
        /// <param name="hashCode"></param>
        /// <param name="currentStep"></param>
        /// <param name="totalSteps"></param>
        /// <returns></returns>
        public Tuple<long, long> SetGetProgress(int hashCode, long currentStep, long totalSteps)
        {
            SetProgress(hashCode, currentStep, totalSteps);
            return GetProgress();
        }
    }
}
0 голосов
/ 03 октября 2018

Ниже приведен один из возможных подходов.Подобно тому, что я описал выше, за исключением того, что я передал «работу» Задаче и перекачал ReportProgress из начального контекста потока.

Во-первых, пара классов.Я использую Random, чтобы решить, сколько времени займет каждое задание и сколько заданий в каждом WorkObject.Работа эмулирует высокую нагрузку на процессор с помощью жесткой петли.Вы бы использовали свои собственные объекты (и действительно полезную работу).

public class Job
{
    private readonly TimeSpan timeForJobToTake;

    public Job(TimeSpan timeForJobToTake)
    {
        this.timeForJobToTake = timeForJobToTake;
    }

    public void DoJob()
    {
        DateTime endTime = DateTime.UtcNow.Add(this.timeForJobToTake);
        while (DateTime.UtcNow < endTime)
        {
            // emulate high CPU load during job
        }
    }
}

public class WorkObject
{
    private readonly List<Job> jobs = new List<Job>();

    public WorkObject(Random random)
    {
        int jobsToCreate = random.Next(1, 10);
        for (int i = 0; i < jobsToCreate; i++)
        {
            Job job = new Job(TimeSpan.FromMilliseconds(random.Next(100, 200)));
            this.jobs.Add(job);
        }
    }

    public int JobCount => this.jobs.Count;

    public void PerformWork()
    {
        foreach (Job job in this.jobs)
        {
            job.DoJob();
        }
    }
}

Затем вы можете сделать что-то вроде следующего (консольное приложение, но код может работать в других контекстах):

internal class Program
{
    private static readonly object syncObj = new object();

    private static int lastNumerator;

    private static int numerator;

    private static int denominator;

    private static void ReportProgress()
    {
        int currentNumerator = numerator;
        // Don't emit progress if nothing changed
        if (currentNumerator == lastNumerator) return;
        Console.WriteLine($"{currentNumerator} of {denominator}");
        lastNumerator = currentNumerator;
    }

    private static void Main(string[] args)
    {
        MainAsync().Wait();
        Console.ReadLine();
    }

    private static async Task MainAsync()
    {
        // Setup example objects
        Random random = new Random();
        List<WorkObject> workObjects = new List<WorkObject>();

        int numberOfWorkObjects = random.Next(50, 100);
        for (int i = 0; i < numberOfWorkObjects; i++)
        {
            WorkObject workObject = new WorkObject(random);
            denominator += workObject.JobCount;
            workObjects.Add(workObject);
        }

        // The CancellationTokenSource is used to immediately abort the progress reporting once the work is complete
        CancellationTokenSource progressReportCancellationTokenSource = new CancellationTokenSource();

        Task workTask = Task.Run(() =>
                                 {
                                     Parallel.ForEach(workObjects,
                                                      wo =>
                                                      {
                                                          wo.PerformWork();
                                                          lock (syncObj)
                                                          {
                                                              numerator += wo.JobCount;
                                                          }
                                                      });
                                     progressReportCancellationTokenSource.Cancel();
                                 });

        while (!workTask.IsCompleted)
        {
            try
            {
                ReportProgress();
                await Task.Delay(250, progressReportCancellationTokenSource.Token);
            }
            catch (TaskCanceledException)
            {
                break;
            }
        }

        await workTask;
        ReportProgress();
    }
}
...