TPL: как разделить и объединить поток данных? - PullRequest
0 голосов
/ 05 ноября 2019

Я пытаюсь создать поток данных, используя tpl со следующей формой:

                    -> LoadDataBlock1 -> ProcessDataBlock1 ->  
GetInputPathsBlock  -> LoadDataBlock2 -> ProcessDataBlock2 -> MergeDataBlock -> SaveDataBlock
                    -> LoadDataBlock3 -> ProcessDataBlock3 ->
                    ...                             
                    -> LoadDataBlockN -> ProcessDataBlockN ->

Идея состоит в том, что GetInputPathsBlock является блоком, который находит пути к входным данным, которые должны бытьзагружен, а затем отправляет путь к каждому LoadDataBlock. Все LoadDataBlocks идентичны (за исключением того, что каждый из них получил уникальную строку inputPath из GetInputPaths). Загруженные данные затем отправляются на ProcessDataBlock, который выполняет простую обработку. Затем данные с каждого ProcessDataBlock отправляются на MergeDataBlock, который объединяет их и отправляет на SaveDataBlock, который затем сохраняет их в файл.

Думайте об этом как о потоке данных, который необходимо запуститьза каждый месяц. Сначала определяется путь для данных за каждый день. Данные каждого дня загружаются и обрабатываются, а затем объединяются в течение всего месяца и сохраняются. Каждый месяц может выполняться параллельно, данные для каждого дня в месяце могут загружаться параллельно и обрабатываться параллельно (после загрузки данных за отдельный день), а после загрузки и обработки всего данных за месяц их можно объединить и сохранить.

То, что я пытался

Насколько я могу сказать, TransformManyBlock<TInput,string> может использоваться для разделения (GetInputPathsBlock) и может быть связано собычный TransformBlock<string,InputData> (LoadDataBlock) и оттуда к другому TransformBlock<InputData,ProcessedData> (ProcessDataBlock), но я не знаю, как затем объединить его обратно в один блок.

На что я смотрел

Я нашел этот ответ , который использует TransformManyBlock, чтобы перейти от IEnumerable<item> к item, но я не до конца понимаю,и я не могу связать TransformBlock<InputData,ProcessedData> (ProcessDataBlock) с TransformBlock<IEnumerable<ProcessedData>>,ProcessedData>, поэтому я не знаю, как его использовать.

Я также видел ответы вот так , что предполагает использование JoinBlock, но количество входных файлов N варьируется, и все файлы все равно загружаются одинаково.

Существует также этот ответ , которая, кажется, делает то, что я хочу, но я не до конца понимаю, и я не знаю, как настройки со словарем будут перенесены в мой случай.

КакЯ разделяю и объединяю свой поток данных?

  • Существует ли тип блока, который мне не хватает
  • Можно ли как-то использовать TransformManyBlock дважды?
  • Имеет ли tplимеет смысл для разделения / слияния или есть более простой способ асинхронизации / ожидания?

Ответы [ 2 ]

0 голосов
/ 15 ноября 2019

Ответы на ваши вопросы: нет, вам не нужен другой тип блока, да, вы можете использовать TransformManyBlock дважды, и да, это имеет смысл. Я написал некоторый код, чтобы доказать это, который находится внизу, и некоторые примечания о том, как это работает, и после этого.

Код использует конвейер разделения, а затем слияния, как вы описали. Что касается того, с чем вы боролись: объединить данные для отдельных файлов вместе можно, добавив обработанные элементы в список по мере их появления. Затем мы только передаем список следующему блоку, если он имеет ожидаемое конечное количество элементов. Это можно сделать с помощью довольно простого блока TransformMany, возвращающего ноль или один элемент. Этот блок нельзя распараллелить, поскольку список не является потокобезопасным.

Получив такой конвейер, вы можете проверить распараллеливание и упорядочение, просто используя параметры, переданные блокам. Приведенный ниже код устанавливает распараллеливание неограниченным для каждого возможного блока и позволяет коду DataFlow его разобрать. На моей машине он максимально использует все ядра / логические процессоры и привязан к процессору, что нам и нужно. Упорядочивание включено, но его отключение не имеет большого значения: опять же, мы привязаны к процессору.

Наконец, я должен сказать, что это очень крутая технология, но вы можете решить эту проблему гораздо большепросто используя PLINQ, где всего несколько строк кода, чтобы получить что-то столь же быстро. Большим недостатком является то, что вы не можете легко добавлять быстро поступающие сообщения в конвейер, если делаете это: PLINQ лучше подходит для одного процесса большой партии. Однако PLINQ может быть лучшим решением для вашего варианта использования.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks.Dataflow;

namespace ParallelDataFlow
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().Run();
            Console.ReadLine();
        }

        private void Run()
        {
            Stopwatch s = new Stopwatch();
            s.Start();

            // Can  experiment with parallelization of blocks by changing MaxDegreeOfParallelism
            var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded };
            var getInputPathsBlock = new TransformManyBlock<(int, int), WorkItem>(date => GetWorkItemWithInputPath(date), options);
            var loadDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => LoadDataIntoWorkItem(workItem), options);
            var processDataBlock = new TransformBlock<WorkItem, WorkItem>(workItem => ProcessDataForWorkItem(workItem), options);
            var waitForProcessedDataBlock = new TransformManyBlock<WorkItem, List<WorkItem>>(workItem => WaitForWorkItems(workItem));  // Can't parallelize this block
            var mergeDataBlock = new TransformBlock<List<WorkItem>, List<WorkItem>>(list => MergeWorkItemData(list), options);
            var saveDataBlock = new ActionBlock<List<WorkItem>>(list => SaveWorkItemData(list), options);

            var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
            getInputPathsBlock.LinkTo(loadDataBlock, linkOptions);
            loadDataBlock.LinkTo(processDataBlock, linkOptions);
            processDataBlock.LinkTo(waitForProcessedDataBlock, linkOptions);
            waitForProcessedDataBlock.LinkTo(mergeDataBlock, linkOptions);
            mergeDataBlock.LinkTo(saveDataBlock, linkOptions);

            // We post individual tuples of (year, month) to our pipeline, as many as we want
            getInputPathsBlock.Post((1903, 2));  // Post one month and date
            var dates = from y in Enumerable.Range(2015, 5) from m in Enumerable.Range(1, 12) select (y, m);
            foreach (var date in dates) getInputPathsBlock.Post(date);  // Post a big sequence         

            getInputPathsBlock.Complete();
            saveDataBlock.Completion.Wait();
            s.Stop();
            Console.WriteLine($"Completed in {s.ElapsedMilliseconds}ms on {ThreadAndTime()}");
        }

        private IEnumerable<WorkItem> GetWorkItemWithInputPath((int year, int month) date)
        {
            List<WorkItem> processedWorkItems = new List<WorkItem>();  // Will store merged results
            return GetInputPaths(date.year, date.month).Select(
                path => new WorkItem
                {
                    Year = date.year,
                    Month = date.month,
                    FilePath = path,
                    ProcessedWorkItems = processedWorkItems
                });
        }

        // Get filepaths of form e.g. Files/20191101.txt  These aren't real files, they just show how it could work.
        private IEnumerable<string> GetInputPaths(int year, int month) =>
            Enumerable.Range(0, GetNumberOfFiles(year, month)).Select(i => $@"Files/{year}{Pad(month)}{Pad(i + 1)}.txt");

        private int GetNumberOfFiles(int year, int month) => DateTime.DaysInMonth(year, month);

        private WorkItem LoadDataIntoWorkItem(WorkItem workItem) {
            workItem.RawData = LoadData(workItem.FilePath);
            return workItem;
        }

        // Simulate loading by just concatenating to path: in real code this could open a real file and return the contents
        private string LoadData(string path) => "This is content from file " + path;

        private WorkItem ProcessDataForWorkItem(WorkItem workItem)
        {
            workItem.ProcessedData = ProcessData(workItem.RawData);
            return workItem;
        }

        private string ProcessData(string contents)
        {
            Thread.SpinWait(11000000); // Use 11,000,000 for ~50ms on Windows .NET Framework.  1,100,000 on Windows .NET Core.
            return $"Results of processing file with contents '{contents}' on {ThreadAndTime()}";
        }

        // Adds a processed WorkItem to its ProcessedWorkItems list.  Then checks if the list has as many processed WorkItems as we 
        // expect to see overall.  If so the list is returned to the next block, if not we return an empty array, which passes nothing on.
        // This isn't threadsafe for the list, so has to be called with MaxDegreeOfParallelization = 1
        private IEnumerable<List<WorkItem>> WaitForWorkItems(WorkItem workItem)
        {
            List<WorkItem> itemList = workItem.ProcessedWorkItems;
            itemList.Add(workItem);
            return itemList.Count == GetNumberOfFiles(workItem.Year, workItem.Month) ? new[] { itemList } : new List<WorkItem>[0];
        }

        private List<WorkItem> MergeWorkItemData(List<WorkItem> processedWorkItems)
        {
            string finalContents = "";
            foreach (WorkItem workItem in processedWorkItems)
            {
                finalContents = MergeData(finalContents, workItem.ProcessedData);
            }
            // Should really create a new data structure and return that, but let's cheat a bit
            processedWorkItems[0].MergedData = finalContents;
            return processedWorkItems;
        }

        // Just concatenate the output strings, separated by newlines, to merge our data
        private string MergeData(string output1, string output2) => output1 != "" ? output1 + "\n" + output2 : output2;

        private void SaveWorkItemData(List<WorkItem> workItems)
        {
            WorkItem result = workItems[0];
            SaveData(result.MergedData, result.Year, result.Month);
            // Code to show it's worked...
            Console.WriteLine($"Saved data block for {DateToString((result.Year, result.Month))} on {ThreadAndTime()}." +
                              $"  File contents:\n{result.MergedData}\n");
        }
        private void SaveData(string finalContents, int year, int month)
        {
            // Actually save, although don't really need to in this test code
            new DirectoryInfo("Results").Create();
            File.WriteAllText(Path.Combine("Results", $"results{year}{Pad(month)}.txt"), finalContents);
        }

        // Helper methods
        private string DateToString((int year, int month) date) => date.year + Pad(date.month);
        private string Pad(int number) => number < 10 ? "0" + number : number.ToString();
        private string ThreadAndTime() => $"thread {Pad(Thread.CurrentThread.ManagedThreadId)} at {DateTime.Now.ToString("hh:mm:ss.fff")}";
    }

    public class WorkItem
    {
        public int Year { get; set; }
        public int Month { get; set; }
        public string FilePath { get; set; }
        public string RawData { get; set; }
        public string ProcessedData { get; set; }
        public List<WorkItem> ProcessedWorkItems { get; set; }
        public string MergedData { get; set; }
    }
}

Этот код передает объект WorkItem из каждого блока в следующий и обогащает его на каждом этапе. Затем он создает окончательный список со всеми рабочими элементами за месяц, прежде чем запустить процесс агрегации и сохранить результаты.

Этот код основан на фиктивных методах для каждого этапа с использованием имен, которые вы используете. Это не так много, но мы надеемся продемонстрировать решение. Например, LoadData передает путь к файлу и просто добавляет к нему некоторый текст и передает строку, но, очевидно, он может загрузить реальный файл и передать строку содержимого, если на самом деле был файл на диске.

Аналогично, чтобы имитировать выполнение работы в ProcessData, мы делаем Thread.SpinWait, а затем просто добавляем некоторый текст в строку. Отсюда и задержка, поэтому измените номер, если хотите, чтобы он работал быстрее или медленнее. Код был написан на .NET Framework, но он работает на Core 3.0, а также на Ubuntu и OSX. Единственное отличие состоит в том, что цикл SpinWait может быть значительно длиннее или короче, поэтому вы можете поиграть с задержкой.

Обратите внимание, что мы могли бы объединиться в waitForProcessedDataBlock и иметь именно тот конвейер, который вы запрашивали. Это было бы немного более запутанным

Код в конце создает файлы на диске, но также выводит результаты на экран, так что на самом деле это не нужно.

Если вы установите параллелизацию на 1, вы обнаружите, что она замедляется примерно на ожидаемую величину. У меня Windows-машина с четырьмя ядрами и она чуть хуже, чем в четыре раза медленнее.

0 голосов
/ 07 ноября 2019

Я бы использовал вложенный блок, чтобы не разбивать мои ежемесячные данные, а затем снова объединять их. Вот пример двух вложенных TransformBlock, которые обрабатывают все дни 2020 года:

var monthlyBlock = new TransformBlock<int, List<string>>(async (month) =>
{
    var dailyBlock = new TransformBlock<int, string>(async (day) =>
    {
        await Task.Delay(100); // Simulate async work
        return day.ToString();
    }, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 4 });

    foreach (var day in Enumerable.Range(1, DateTime.DaysInMonth(2020, month)))
        await dailyBlock.SendAsync(day);
    dailyBlock.Complete();

    var dailyResults = await dailyBlock.ToListAsync();
    return dailyResults;
}, new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1 });

foreach (var month in Enumerable.Range(1, 12))
    await monthlyBlock.SendAsync(month);
monthlyBlock.Complete();

Для сбора ежедневных результатов внутреннего блока я использовал метод расширения ToListAsync, который показан ниже:

public static async Task<List<T>> ToListAsync<T>(this IReceivableSourceBlock<T> block,
    CancellationToken cancellationToken = default)
{
    var list = new List<T>();
    while (await block.OutputAvailableAsync(cancellationToken).ConfigureAwait(false))
    {
        while (block.TryReceive(out var item))
        {
            list.Add(item);
        }
    }
    return list;
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...