Как установить приоритет процессора для нескольких блоков ActionBlock, работающих одновременно? - PullRequest
0 голосов
/ 12 марта 2020

У меня есть куча ActionBlocks, каждый из которых делает что-то свое.

  • Большой обрабатывает данные и непрерывно передает данные TransformBlock.
  • 3. другие ActionBlocks просто записывают строки в 3 текстовых файлах (журналы).

Это вроде работает, за исключением того, что 3 журнала ActionBlocks начинают потреблять данные только после завершения обработки ActionBlock (так они записывают всю информацию о регистрации в один go в конце программы).

Мне было интересно, смогу ли я повлиять на это поведение, чтобы придать более высокий приоритет записи ActionBlocks?

Спасибо за вашу помощь.

Пример кода:

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace dataflowtest
{
    class Program
    {
        const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        static readonly IReadOnlyCollection<string> charsSets = Enumerable.Repeat(chars, 8).ToList().AsReadOnly();
        static readonly Random random = new Random();

        static event EventHandler<string> MessageGot;

        static async Task Main(string[] args)
        {
            var source = new TransformBlock<string, string>(GetMessage, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1, EnsureOrdered = false });
            var target = new ActionBlock<string>(Console.WriteLine);


            var programDir = Path.GetDirectoryName(System.Reflection.Assembly.GetEntryAssembly().GetName().CodeBase.Replace("file:///", ""));
            using var file1 = new StreamWriter(Path.Combine(programDir, "file1.txt"));
            using var file2 = new StreamWriter(Path.Combine(programDir, "file2.txt"));
            using var file3 = new StreamWriter(Path.Combine(programDir, "file3.txt"));

            var fileAction1 = new ActionBlock<string>(file1.WriteLineAsync);
            var fileAction2 = new ActionBlock<string>(file2.WriteLineAsync);
            var fileAction3 = new ActionBlock<string>(file3.WriteLineAsync);

            MessageGot += async (_, e) => await fileAction1.SendAsync(e);
            MessageGot += async (_, e) => await fileAction2.SendAsync(e);
            MessageGot += async (_, e) => await fileAction3.SendAsync(e);

            using (source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true }))
            {
                for (int i = 0; i < 100; i++)
                {
                    await source.SendAsync(i.ToString() + '\t' + new string(charsSets.Select(s => s[random.Next(s.Length)]).ToArray()));
                }

                source.Complete();
                await target.Completion;
            }
        }

        private static async Task<string> GetMessage(string input)
        {
            int delay = random.Next(25, 6000);
            await Task.Delay(delay);
            string message = input.ToLowerInvariant() + '\t' + delay.ToString();

            MessageGot?.Invoke(null, message);

            return message;
        }
    }
}

1 Ответ

2 голосов
/ 12 марта 2020

По умолчанию StreamWriter будет буферизовать sh свой буфер каждые 4096 байт. Вы, вероятно, хотите, чтобы это было гриппом sh на каждой написанной строке. Таким образом, вместо этого:

var fileAction1 = new ActionBlock<string>(file1.WriteLineAsync);

... сделайте это:

var fileAction1 = new ActionBlock<string>(item =>
{
    file1.WriteLine(item);
    file1.Flush();
});

Нет смысла использовать WriteLineAsync вместо WriteLine, поскольку базовый FileStream не был открыт с опцией FileOptions.Asynchronous.

...