Многопоточность обработки текста - PullRequest
0 голосов
/ 27 октября 2018

Я написал кусок кода, который обрабатывает файлы. Основная цель - заменить все совпадения (например, ABC: 123 , где 123 может быть любым) в файлах .log. Итак, я разделяю файлы на куски, чтобы предотвратить исключения OutOfMemory. Кроме того, я использую TPL для повышения производительности приложения, но думаю, что в коде есть некоторые недостатки.

Может ли кто-нибудь взглянуть на код или просмотреть его и дать несколько советов? Также код доступен на github

    private static void ProcessFiles()
    {
        var tasks = new BlockingCollection<Task>();

        Parallel.ForEach(FilePaths, path =>
        {
            tasks.Add(Task.Run(() =>
            {
                ProcessFile(path);
            }));
        });

        Task.WaitAll(tasks.ToArray());
    }

    private static void ProcessFile(string path)
    {
        if (!File.Exists(path)) return;

        try
        {
            string text;
            using (var fs = File.Open(path, FileMode.Open, FileAccess.Read))
            using (var bs = new BufferedStream(fs))
            using (var sr = new StreamReader(bs))
            {
                text = sr.ReadToEnd();
            }

            const int chunkSize = 10 * 1024;
            var limit = (text.Length + chunkSize - 1) / chunkSize;
            var chuncks = Enumerable.Range(0, limit).Select(i =>
            {
                var startIndex = i * chunkSize;
                var length = text.Length - startIndex >= chunkSize ? chunkSize : text.Length - startIndex;

                return text.Substring(startIndex, length);
            }).ToList();

            Parallel.ForEach(chuncks, (row, _, index) =>
            {
                var i = Convert.ToInt32(index);
                chuncks[i] = ProcessText(row);
            });
            SaveProcessedFile(path, chuncks);
        }
        catch (Exception ex)
        {
            Logger.Error(ex, ex.Message);
        }
    }

    private static string ProcessText(string oldText)
    {
        var processedText = Pattern.Replace(oldText, Replacement);

        return processedText;
    }

    private static void SaveProcessedFile(string path, List<string> text)
    {
        using (var fs = File.Open(path, FileMode.Create))
        using(var wr = new StreamWriter(fs, Encoding.Default))
        {
            foreach (var chunk in text)
            {
                wr.Write(chunk);
            }
        }
    }

1 Ответ

0 голосов
/ 28 октября 2018

Вам необходимо вызвать bc.CompleteAdding(), чтобы указать, что коллекция блокирования закончила добавление и не должна блокировать перечислимое значение, полученное из bc.GetConsumingEnumerable()

, вот как выглядит вызов:

            using (var bc = new BlockingCollection<string>(new ConcurrentQueue<string>(chuncks)))
            {
                //Signal complete adding
                bc.CompleteAdding();
                var collection = bc.GetConsumingEnumerable();

                Parallel.ForEach(collection, (row, _, index) =>
                {
                    ProcessText(row);
                });

                return String.Join(String.Empty, collection);
            }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...