Как максимизировать производительность процесса (C #)? - PullRequest
0 голосов
/ 30 октября 2018

Я хочу обработать некоторые файлы с максимальной пропускной способностью. Пути к файлам сохраняются в базе данных. Мне нужно получить пути к файлам из базы данных, изменить их статус на обработку, обработать их, а затем изменить их статус на завершенный или неудачный.

В настоящее время я получаю файлы партиями (из 100 файлов), чтобы уменьшить количество выполненных запросов и обрабатывать их параллельно (со степенью параллелизма 10). Но при этом я теряю пропускную способность к концу пакета. Если в пакете осталось менее 10 файлов, степень параллелизма больше не равна 10, а уменьшается.

Вот что у меня есть:

private async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
{
    var batchIndex = 0;
    while (true)
    {
        var fileBatch = _sourceFileService.GetSourceFileBatchBySourcePathId(
            sourcePath.Id, _dataSourceExportConfig.FileCopyBatchSize, Status.Pending);
        if (fileBatch.Count == 0)
            return;

        await SetInProgressStatusForBatch(fileBatch)
            .ConfigureAwait(false);

        fileBatch
            .AsParallel()
            .WithDegreeOfParallelism(_dataSourceExportConfig.FileCopyDegreeOfParallelism)
            .ForAll(file => ProcessFile(file, destinationBase, options));

        await _sourceFileService
            .UpdateSourceFilesStatusAsync(fileBatch)
            .ConfigureAwait(false);

        batchIndex++;
    }
}

private async Task SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
    foreach (var file in fileBatch)
        file.Status = Status.InProgress;

    await _sourceFileService
        .UpdateSourceFilesStatusAsync(fileBatch)
        .ConfigureAwait(false);
}

private void ProcessFile(
    SourceFile file,
    string destinationBase,
    Options options)
{
    try
    {
        //do something ...

        file.Status = Status.Success;
        file.ExceptionMessage = null;
    }
    catch (Exception ex)
    {
        _logger.Error(ex);
        file.Status = Status.Failed;
        file.ExceptionMessage = ex.Message;
    }
}

Как я могу максимизировать пропускную способность? Я читал о шаблоне «производитель-потребитель» с BlockingCollection, TPL Dataflow и Rx, и я почти уверен, что то, чего я хочу достичь, может быть реализовано с помощью любого из вышеперечисленного, но я пока не смог этого сделать. С шаблоном «производитель-потребитель» мой производитель чрезвычайно быстр по сравнению с потребителем, с потоком данных TPL я застрял с BatchBlock и не пробовал Rx. Может ли кто-нибудь указать мне правильное направление?

Обновление: Вот минимальный, полный и проверяемый пример:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    internal static class Program
    {
        private static void Main()
        {
            Console.WriteLine("Processing files");

            var stopWatch = new Stopwatch();
            stopWatch.Start();

            var fileService = new FileService();
            fileService.ProcessPendingFiles();

            foreach (var sourceFile in fileService.SourceFiles)
            {
                Console.WriteLine($"{sourceFile.Id} {sourceFile.Status}");
            }

            Console.WriteLine(stopWatch.Elapsed);

            Console.ReadLine();
        }
    }

    public class FileService
    {
        private const int BatchSize = 100;
        private const int DegreeOfParallelism = 10;
        //this SourceFiles property replaces the Sqlite database where the data is actually stored
        public ICollection<SourceFile> SourceFiles =
            Enumerable
                .Range(0, 1000)
                .Select(i =>
                    new SourceFile
                    {
                        Id = i,
                        Path = "source file path",
                        Status = Status.Pending,
                    })
                .ToList();

        public void ProcessPendingFiles()
        {
            while (true)
            {
                var fileBatch = GetSourceFileBatch(BatchSize, Status.Pending);
                if (fileBatch.Count == 0)
                    return;

                SetInProgressStatusForBatch(fileBatch);

                fileBatch
                    .AsParallel()
                    .WithDegreeOfParallelism(DegreeOfParallelism)
                    .ForAll(ProcessFile);

                UpdateSourceFiles(fileBatch);
            }
        }

        private ICollection<SourceFile> GetSourceFileBatch(int batchSize, Status status)
            => SourceFiles
                .Where(sf => sf.Status == status)
                .Take(batchSize)
                .ToList();

        //set status to in progress for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                file.Status = Status.InProgress;

                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
            }
        }

        //set status and exception messages for all files in the batch
        //and save the changes to database
        //in the application this is actually done with a bulk update and the method is async
        private void UpdateSourceFiles(IEnumerable<SourceFile> fileBatch)
        {
            foreach (var file in fileBatch)
            {
                var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
                sourceFile.Status = file.Status;
                sourceFile.ExceptionMessage = file.ExceptionMessage;
            }
        }

        private void ProcessFile(SourceFile file)
        {
            try
            {
                //do something ...
                Thread.Sleep(20);

                file.Status = Status.Success;
                file.ExceptionMessage = null;
            }
            catch (Exception ex)
            {
                file.Status = Status.Failed;
                file.ExceptionMessage = ex.Message;
            }
        }
    }

    public class SourceFile
    {
        public int Id { get; set; }

        public string Path { get; set; }

        public Status Status { get; set; }

        public string ExceptionMessage { get; set; }
    }

    public enum Status
    {
        Pending,

        InProgress,

        Success,

        Failed,
    }
}

Ответы [ 4 ]

0 голосов
/ 30 октября 2018

Эта операция, конечно, может быть выполнена с TPL-Dataflow, как вы упомянули, но трудно понять, действительно ли вы видите какое-либо увеличение пропускной способности. С любой метрикой производительности лучшее, что вы можете сделать, это попробовать разные подходы и измерить результаты.

Этот образец включает в себя наиболее важные параметры для настройки поведения потока данных, чтобы вы могли экспериментировать. Структура основана на вашем примере кода с некоторыми допущениями.

  • Один SourcePath дает партию SourceFile
  • Обновление SourceFile Статус асинхронный
  • Обработка SourceFile синхронизация

Пример:

public class ProcessFilesFlow
{
    private TransformBlock<SourcePath, IEnumerable<SourceFile>> _getSourceFileBatch;
    private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _setStatusToProcessing;
    private TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>> _processFiles;
    private ActionBlock<IEnumerable<SourceFile>> _setStatusToComplete;

    public ProcessFilesFlow()
    {
        //Setup options
        //All of these options and more can be tuned for throughput
        var getSourceFileBatchOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //How many source paths to queue at one time
            MaxDegreeOfParallelism = 10, //How many source paths to get batches for at one time
            EnsureOrdered = false //Process batches as soon as ready
        };
        var setStatusToProcessingOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //How many batches to queue at one time
            MaxDegreeOfParallelism = 10, //Unlimited, how many batches to updates status for
            EnsureOrdered = false //Process batches as soon as ready
        };
        var processFilesOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //Batches to queue at one time
            MaxDegreeOfParallelism = 10, //Batches to work on at the same time
            EnsureOrdered = false //Process batches as soon as ready
        };
        var setStatusToCompleteOptions = new ExecutionDataflowBlockOptions()
        {
            BoundedCapacity = 10, //Batches to queue at one time
            MaxDegreeOfParallelism = 10, //Batches to update at once
            EnsureOrdered = false //Process batches as soon as ready
        };

        //Build the dataflow pipeline
        _getSourceFileBatch = new TransformBlock<SourcePath, IEnumerable<SourceFile>>(path => GetSourceFileBatch(path), getSourceFileBatchOptions);
        _setStatusToProcessing = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => SetStatusToProcessingAsync(batch), setStatusToProcessingOptions);
        _processFiles = new TransformBlock<IEnumerable<SourceFile>, IEnumerable<SourceFile>>(batch => ProcessFiles(batch), processFilesOptions);
        _setStatusToComplete = new ActionBlock<IEnumerable<SourceFile>>(batch => SetStatusToCompleteAsync(batch), setStatusToCompleteOptions);

        //Link the pipeline
        _getSourceFileBatch.LinkTo(_setStatusToProcessing, new DataflowLinkOptions() { PropagateCompletion = true });
        _setStatusToProcessing.LinkTo(_processFiles, new DataflowLinkOptions() { PropagateCompletion = true });
        _processFiles.LinkTo(_setStatusToComplete, new DataflowLinkOptions() { PropagateCompletion = true });
    }

    public async Task ProcessAll(IEnumerable<SourcePath> sourcePaths)
    {
        foreach(var path in sourcePaths)
        {
            await _getSourceFileBatch.SendAsync(path);
        }
        _getSourceFileBatch.Complete();
        await _setStatusToComplete.Completion;
    }

    private IEnumerable<SourceFile> GetSourceFileBatch(SourcePath sourcePath)
    {
        //Get batch of files based on sourcePath
        return Enumerable.Empty<SourceFile>();
    }

    private async Task<IEnumerable<SourceFile>> SetStatusToProcessingAsync(IEnumerable<SourceFile> sourceFiles)
    {
        //Update file status
        foreach (var file in sourceFiles)
            await file.UpdateStatusAsync("In Progress");
        return sourceFiles;
    }

    private IEnumerable<SourceFile> ProcessFiles(IEnumerable<SourceFile> sourceFiles)
    {
        //process files
        foreach (var file in sourceFiles)
            file.Process();
        return sourceFiles;
    }

    private async Task SetStatusToCompleteAsync(IEnumerable<SourceFile> sourceFiles)
    {
        //Update file status
        foreach (var file in sourceFiles)
            await file.UpdateStatusAsync("Completed");
    }
}

Также доступны другие опции, такие как разделение пакета на TransformManyBlock и параллельная обработка отдельных файлов из пакетов.

0 голосов
/ 30 октября 2018

Я знаю, что вы, вероятно, будете ненавидеть этот ответ, но в конечном итоге это зависит ...

Я не совсем уверен, что это за файлы, где они живут или что означает их обработка. В моем ответе предполагается, что вы довольны текущей обработкой на пике, вам просто нужен лучший способ обеспечить стабильную производительность, а не снижение производительности. Я постараюсь ответить на ваш более прямой вопрос с точки зрения использования шаблона производитель-потребитель с BlockingCollection, а не менять весь подход.

Я думаю, вы понимаете, почему происходит замедление, но вы не знаете, как с этим справиться, так как вы выбираете следующую партию элементов только после завершения текущей партии. (Само собой разумеется, что это, вероятно, хороший случай для использования очереди сообщений, а не SQL, но это несколько отдельное обсуждение, которое избегает вашего основного вопроса.)

На этот вопрос довольно подробно ответили на следующий вопрос:

Классический шаблон для потребителя-производителя с использованием блокировки и задач .net 4 TPL

public class YourCode
{
  private BlockingCollection<object> queue = new BlockingCollection<object>();

  public YourCode()
  {
    var thread = new Thread(StartConsuming);
    thread.IsBackground = true;
    thread.Start();
  }

  public void Produce(object item)
  {
    queue.Add(item);
  }

  private void StartConsuming()
  {
    while (true)
    {
      object item = queue.Take();
      // Add your code to process the item here.
      // Do not start another task or thread. 
    }
  }
}

Тогда у вас может быть несколько потребителей с одним производителем (поскольку вы указываете, что вы производите намного быстрее, чем потребляете)

0 голосов
/ 30 октября 2018

Рабочий шаблон должен упростить для вас вещи и гарантировать, что вы всегда параллельно обрабатываете согласованное количество единиц работы.

Если вы создаете, например, 10 заданий заранее и позволяете им выполнять новое задание до тех пор, пока не останется ни одного, вы больше не будете полагаться на ожидание завершения целого пакета потоков или заданий перед тем, как начинать работу.

class WorkController
{
    private DataSourceExportConfig _dataSourceExportConfig;
    private SourceFileService _sourceFileService;
    private string destinationBase;

    public async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
    {
        await Task.WhenAll(Enumerable.Range(0, 10).Select(x => Worker(sourcePath, options)));
    }

    public async Task Worker(SourcePath sourcePath, Options options)
    {
        SourceFile file = null;

        while (_sourceFileService.GetNextFile(out file))
        {
            ProcessFile(file, destinationBase, options);
        }
    }

    private void ProcessFile(SourceFile file, string destinationBase, Options options)
    {
    }
}
0 голосов
/ 30 октября 2018

Это операция с диском. Параллелизация не работает хорошо на тех. Диски имеют физически ограниченную пропускную способность. А бомбардировка его запросами приведет только к добавлению времени поиска ко всему вычислению. Существуют функции, такие как NCQ , которые будут пытаться смягчить этот эффект, но они имеют ограничения.

В сети, по крайней мере, паралилизация может иметь некоторый эффект:

  • с использованием Medium, когда один запрос находится в фазе "издержки Protocoll"
  • обойти ограничения на соединение, которые могут быть на месте

Но даже там существуют жесткие ограничения.

Лучший способ иметь быстрые операции с диском - это не иметь ужасного внутреннего диска. то есть, не используя вращающийся диск. Или, по крайней мере, организовать их в Raid 0 или аналогичную структуру.

...