Я хочу обработать некоторые файлы с максимальной пропускной способностью. Пути к файлам сохраняются в базе данных. Мне нужно получить пути к файлам из базы данных, изменить их статус на обработку, обработать их, а затем изменить их статус на завершенный или неудачный.
В настоящее время я получаю файлы партиями (из 100 файлов), чтобы уменьшить количество выполненных запросов и обрабатывать их параллельно (со степенью параллелизма 10). Но при этом я теряю пропускную способность к концу пакета. Если в пакете осталось менее 10 файлов, степень параллелизма больше не равна 10, а уменьшается.
Вот что у меня есть:
private async Task CopyPendingFilesAsync(SourcePath sourcePath, Options options)
{
var batchIndex = 0;
while (true)
{
var fileBatch = _sourceFileService.GetSourceFileBatchBySourcePathId(
sourcePath.Id, _dataSourceExportConfig.FileCopyBatchSize, Status.Pending);
if (fileBatch.Count == 0)
return;
await SetInProgressStatusForBatch(fileBatch)
.ConfigureAwait(false);
fileBatch
.AsParallel()
.WithDegreeOfParallelism(_dataSourceExportConfig.FileCopyDegreeOfParallelism)
.ForAll(file => ProcessFile(file, destinationBase, options));
await _sourceFileService
.UpdateSourceFilesStatusAsync(fileBatch)
.ConfigureAwait(false);
batchIndex++;
}
}
private async Task SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
file.Status = Status.InProgress;
await _sourceFileService
.UpdateSourceFilesStatusAsync(fileBatch)
.ConfigureAwait(false);
}
private void ProcessFile(
SourceFile file,
string destinationBase,
Options options)
{
try
{
//do something ...
file.Status = Status.Success;
file.ExceptionMessage = null;
}
catch (Exception ex)
{
_logger.Error(ex);
file.Status = Status.Failed;
file.ExceptionMessage = ex.Message;
}
}
Как я могу максимизировать пропускную способность? Я читал о шаблоне «производитель-потребитель» с BlockingCollection, TPL Dataflow и Rx, и я почти уверен, что то, чего я хочу достичь, может быть реализовано с помощью любого из вышеперечисленного, но я пока не смог этого сделать. С шаблоном «производитель-потребитель» мой производитель чрезвычайно быстр по сравнению с потребителем, с потоком данных TPL я застрял с BatchBlock и не пробовал Rx. Может ли кто-нибудь указать мне правильное направление?
Обновление:
Вот минимальный, полный и проверяемый пример:
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
namespace ConsoleApp1
{
internal static class Program
{
private static void Main()
{
Console.WriteLine("Processing files");
var stopWatch = new Stopwatch();
stopWatch.Start();
var fileService = new FileService();
fileService.ProcessPendingFiles();
foreach (var sourceFile in fileService.SourceFiles)
{
Console.WriteLine($"{sourceFile.Id} {sourceFile.Status}");
}
Console.WriteLine(stopWatch.Elapsed);
Console.ReadLine();
}
}
public class FileService
{
private const int BatchSize = 100;
private const int DegreeOfParallelism = 10;
//this SourceFiles property replaces the Sqlite database where the data is actually stored
public ICollection<SourceFile> SourceFiles =
Enumerable
.Range(0, 1000)
.Select(i =>
new SourceFile
{
Id = i,
Path = "source file path",
Status = Status.Pending,
})
.ToList();
public void ProcessPendingFiles()
{
while (true)
{
var fileBatch = GetSourceFileBatch(BatchSize, Status.Pending);
if (fileBatch.Count == 0)
return;
SetInProgressStatusForBatch(fileBatch);
fileBatch
.AsParallel()
.WithDegreeOfParallelism(DegreeOfParallelism)
.ForAll(ProcessFile);
UpdateSourceFiles(fileBatch);
}
}
private ICollection<SourceFile> GetSourceFileBatch(int batchSize, Status status)
=> SourceFiles
.Where(sf => sf.Status == status)
.Take(batchSize)
.ToList();
//set status to in progress for all files in the batch
//and save the changes to database
//in the application this is actually done with a bulk update and the method is async
private void SetInProgressStatusForBatch(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
{
file.Status = Status.InProgress;
var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
sourceFile.Status = file.Status;
}
}
//set status and exception messages for all files in the batch
//and save the changes to database
//in the application this is actually done with a bulk update and the method is async
private void UpdateSourceFiles(IEnumerable<SourceFile> fileBatch)
{
foreach (var file in fileBatch)
{
var sourceFile = SourceFiles.First(sf => sf.Id == file.Id);
sourceFile.Status = file.Status;
sourceFile.ExceptionMessage = file.ExceptionMessage;
}
}
private void ProcessFile(SourceFile file)
{
try
{
//do something ...
Thread.Sleep(20);
file.Status = Status.Success;
file.ExceptionMessage = null;
}
catch (Exception ex)
{
file.Status = Status.Failed;
file.ExceptionMessage = ex.Message;
}
}
}
public class SourceFile
{
public int Id { get; set; }
public string Path { get; set; }
public Status Status { get; set; }
public string ExceptionMessage { get; set; }
}
public enum Status
{
Pending,
InProgress,
Success,
Failed,
}
}