Как обрабатывать данные из файла параллельно в нескольких потоках и записывать их в другой файл, сохраняя исходный порядок данных (C#) - PullRequest
1 голос
/ 26 февраля 2020

Я хотел бы задать вам довольно общий вопрос (хотя мне довольно интересно, как этого добиться в C#).

У меня есть огромный файл, который я хочу прочитать кусками обработайте фрагменты параллельно в нескольких потоках, чтобы ускорить обработку, а затем запишите обработанные данные в другой файл в том же порядке, в котором были прочитаны исходные фрагменты данных (т.е. убедитесь, что первый блок данных, прочитанный из входного файла, будет будет обработан и сохранен первым в выходном файле, второй блок будет обработан и сохранен как второй блок данных в выходной файл, et c ...).

Я думал о реализации- каким-то образом потребителем (т.е. непрерывным чтением из исходного файла чанками - подачей некоторой очереди, из которой набор потоков будет считывать и обрабатывать данные), но я понятия не имею, как записать обработанные данные из этих потоков в выходной файл, чтобы сохранить порядок данных. Даже если бы я попытался поместить обработанные блоки данных, созданные потоками, в другую очередь, из которой они могли бы быть использованы и записаны в выходной файл, я все равно не мог бы контролировать порядок возврата данных из потоков (таким образом, записывая их в выходной файл в правильном порядке).

Есть предложения?

Я новичок в этом деле, поэтому даже теоретические подсказки будут много значить для меня.

Ответы [ 3 ]

2 голосов
/ 26 февраля 2020

Хотя этот вопрос немного открытый и не показывает код ...

Существуют различные подходы к этой проблеме, и все они зависят точно от ваши требования и ограничения .

Хотя, в первую очередь, если bottle -neck , который вы пытаетесь решить, это IO , параллельный , скорее всего, ничего собираюсь помочь.

Однако, если вам нужно сохранить порядок после обработки Работа с процессором в параллельно , существуют различные методы TPL , которые поддерживают порядок, например как

  • PLinq с ParallelEnumerable.AsOrdered
  • TPL DataFlow блоков с параллелью опций с DataflowBlockOptions.EnsureOrdered.
  • Также вы, вероятно, могли бы использовать Reactive Extensions (RX), который, как я считаю, имеет аналогичный

Самый простой подход (предполагая, что данные не могут быть прочитаны и записаны в дискретных блоках) - синхронное чтение фрагментов файла (буфера), обработка данных в параллельно с гарантией упорядоченной функциональности и обратная запись в файл пакетами. Вам, очевидно, придется поиграться с количеством файловых данных, которые вы читаете и пишете (размер буфера), чтобы увидеть, что подходит для вашей ситуации.

Стоит отметить, что вы можете достичь чтения / записи async IO, но это, скорее всего, будет зависеть от фиксированной (взаимоисключающей) структуры файла записи.

0 голосов
/ 27 февраля 2020

Вы должны использовать Microsoft Reactive Framework (также известный как Rx) - NuGet System.Reactive и добавить using System.Reactive.Linq; - тогда вы можете сделать это:

IDisposable subscription =
    File
        .ReadLines("Huge File.txt")
        .ToObservable()
        .Buffer(200)
        .Select((lines, index) => new { lines, index })
        .SelectMany(lis => Observable.Start(() => new { lis.index, output = ProcessChunk(lis.lines) }))
        .ToArray()
        .Select(xs => xs.OrderBy(x => x.index).SelectMany(x => x.output))
        .Subscribe(xs =>
        {
            File.WriteAllLines("Output File.txt", xs.ToArray());
        });

Это обработка строк 200 одновременно, параллельно.

Имейте в виду, что IO намного медленнее, чем обработка ЦП, поэтому, если ProcessChunk не сильно загружает ЦП, то любой многопоточный подход может не улучшить производительность - фактически это может замедлить ее .

0 голосов
/ 26 февраля 2020

Вот метод, который можно использовать для обработки файла в чанках с использованием параллелизма и записи обработанных чанков в другой файл, сохраняя исходный порядок. Этот метод использует библиотеку TPL Dataflow , доступную в виде пакета здесь . Вам не нужно устанавливать этот пакет, если вы используете. NET Core, поскольку TPL Dataflow встроен в эту платформу. Другая зависимость - это пакет System.Interactive , который включает метод Buffer, используемый для разделения строк файла.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
//...

public static async Task ProcessFile(string sourcePath, Encoding sourceEncoding,
    string targetPath, Encoding targetEncoding,
    Func<string, string> lineTransformation,
    int degreeOfParallelism, int chunkSize)
{
    using StreamWriter writer = new StreamWriter(targetPath, false, targetEncoding);
    var cts = new CancellationTokenSource();

    var processingBlock = new TransformBlock<IList<string>, IList<string>>(chunk =>
    {
        return chunk.Select(line => lineTransformation(line)).ToArray();
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = degreeOfParallelism,
        BoundedCapacity = 100, // prevent excessive buffering
        EnsureOrdered = true, // this is the default, but lets be explicit
        CancellationToken = cts.Token, // have a way to abort the processing
    });

    var writerBlock = new ActionBlock<IList<string>>(chunk =>
    {
        foreach (var line in chunk)
        {
            writer.WriteLine(line);
        }
    }); // The default options are OK for this block

    // Link the blocks and propagate completion
    processingBlock.LinkTo(writerBlock,
        new DataflowLinkOptions() { PropagateCompletion = true });

    // In case the writer block fails, the processing block must be canceled
    OnFaultedCancel(writerBlock, cts);

    static async void OnFaultedCancel(IDataflowBlock block, CancellationTokenSource cts)
    {
        try
        {
            await block.Completion.ConfigureAwait(false);
        }
        catch
        {
            cts.Cancel();
        }
    }

    // Feed the processing block with chunks from the source file
    await Task.Run(async () =>
    {
        try
        {
            var chunks = File.ReadLines(sourcePath, sourceEncoding)
                .Buffer(chunkSize);
            foreach (var chunk in chunks)
            {
                var sent = await processingBlock.SendAsync(chunk, cts.Token)
                    .ConfigureAwait(false);
                if (!sent) break; // Happens in case of a processing failure
            }
            processingBlock.Complete();
        }
        catch (OperationCanceledException)
        {
            processingBlock.Complete(); // Cancellation is not an error
        }
        catch (Exception ex)
        {
            // Reading error
            // Propagate by completing the processing block in a faulted state
            ((IDataflowBlock)processingBlock).Fault(ex);
        }
    }).ConfigureAwait(false);

    // All possible exceptions have been propagated to the writer block
    await writerBlock.Completion.ConfigureAwait(false);
}

Этот метод использует новый Синтаксис C# 8 для оператора using. Если вы используете более раннюю версию C#, вам придется добавить фигурные скобки и отступы. Он также использует stati c локальную функцию (также синтаксис C# 8), которую вам, возможно, придется переместить во внешнюю область.

В случае исключения в lineTransformation функция, выходной файл останется неполным, а также может содержать обработанные строки, которые идут после ошибочной строки. Поэтому в случае исключения убедитесь, что вы не используете выходной файл. Вы также можете включить условный File.Delete logi c в метод ProcessFile, если хотите.

Ниже приведен пример использования этого метода. Он асинхронно преобразует большой текстовый файл в верхний регистр:

await ProcessFile("Source.txt", Encoding.UTF8, "Target.txt", Encoding.UTF8, line =>
{
    return line.ToUpper();
}, degreeOfParallelism: 3, chunkSize: 100);

Известный недостаток метода ProcessFile заключается в том, что он подделывает асинхронность путем Task.Run обхода синхронного File.ReadLines метод. К сожалению, в настоящее время не существует эффективного встроенного метода для асинхронного чтения строк текстового файла в. NET Framework или. NET Core.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...