Вот метод, который можно использовать для обработки файла в чанках с использованием параллелизма и записи обработанных чанков в другой файл, сохраняя исходный порядок. Этот метод использует библиотеку TPL Dataflow , доступную в виде пакета здесь . Вам не нужно устанавливать этот пакет, если вы используете. NET Core, поскольку TPL Dataflow встроен в эту платформу. Другая зависимость - это пакет System.Interactive , который включает метод Buffer
, используемый для разделения строк файла.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
//...
public static async Task ProcessFile(string sourcePath, Encoding sourceEncoding,
string targetPath, Encoding targetEncoding,
Func<string, string> lineTransformation,
int degreeOfParallelism, int chunkSize)
{
using StreamWriter writer = new StreamWriter(targetPath, false, targetEncoding);
var cts = new CancellationTokenSource();
var processingBlock = new TransformBlock<IList<string>, IList<string>>(chunk =>
{
return chunk.Select(line => lineTransformation(line)).ToArray();
}, new ExecutionDataflowBlockOptions()
{
MaxDegreeOfParallelism = degreeOfParallelism,
BoundedCapacity = 100, // prevent excessive buffering
EnsureOrdered = true, // this is the default, but lets be explicit
CancellationToken = cts.Token, // have a way to abort the processing
});
var writerBlock = new ActionBlock<IList<string>>(chunk =>
{
foreach (var line in chunk)
{
writer.WriteLine(line);
}
}); // The default options are OK for this block
// Link the blocks and propagate completion
processingBlock.LinkTo(writerBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
// In case the writer block fails, the processing block must be canceled
OnFaultedCancel(writerBlock, cts);
static async void OnFaultedCancel(IDataflowBlock block, CancellationTokenSource cts)
{
try
{
await block.Completion.ConfigureAwait(false);
}
catch
{
cts.Cancel();
}
}
// Feed the processing block with chunks from the source file
await Task.Run(async () =>
{
try
{
var chunks = File.ReadLines(sourcePath, sourceEncoding)
.Buffer(chunkSize);
foreach (var chunk in chunks)
{
var sent = await processingBlock.SendAsync(chunk, cts.Token)
.ConfigureAwait(false);
if (!sent) break; // Happens in case of a processing failure
}
processingBlock.Complete();
}
catch (OperationCanceledException)
{
processingBlock.Complete(); // Cancellation is not an error
}
catch (Exception ex)
{
// Reading error
// Propagate by completing the processing block in a faulted state
((IDataflowBlock)processingBlock).Fault(ex);
}
}).ConfigureAwait(false);
// All possible exceptions have been propagated to the writer block
await writerBlock.Completion.ConfigureAwait(false);
}
Этот метод использует новый Синтаксис C# 8 для оператора using
. Если вы используете более раннюю версию C#, вам придется добавить фигурные скобки и отступы. Он также использует stati c локальную функцию (также синтаксис C# 8), которую вам, возможно, придется переместить во внешнюю область.
В случае исключения в lineTransformation
функция, выходной файл останется неполным, а также может содержать обработанные строки, которые идут после ошибочной строки. Поэтому в случае исключения убедитесь, что вы не используете выходной файл. Вы также можете включить условный File.Delete
logi c в метод ProcessFile
, если хотите.
Ниже приведен пример использования этого метода. Он асинхронно преобразует большой текстовый файл в верхний регистр:
await ProcessFile("Source.txt", Encoding.UTF8, "Target.txt", Encoding.UTF8, line =>
{
return line.ToUpper();
}, degreeOfParallelism: 3, chunkSize: 100);
Известный недостаток метода ProcessFile
заключается в том, что он подделывает асинхронность путем Task.Run
обхода синхронного File.ReadLines
метод. К сожалению, в настоящее время не существует эффективного встроенного метода для асинхронного чтения строк текстового файла в. NET Framework или. NET Core.