У меня есть куча ActionBlocks
, каждый из которых делает что-то свое.
- Большой обрабатывает данные и непрерывно передает данные
TransformBlock
. - 3. другие
ActionBlocks
просто записывают строки в 3 текстовых файлах (журналы).
Это вроде работает, за исключением того, что 3 журнала ActionBlocks
начинают потреблять данные только после завершения обработки ActionBlock
(так они записывают всю информацию о регистрации в один go в конце программы).
Мне было интересно, смогу ли я повлиять на это поведение, чтобы придать более высокий приоритет записи ActionBlocks
?
Спасибо за вашу помощь.
Пример кода:
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace dataflowtest
{
class Program
{
const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
static readonly IReadOnlyCollection<string> charsSets = Enumerable.Repeat(chars, 8).ToList().AsReadOnly();
static readonly Random random = new Random();
static event EventHandler<string> MessageGot;
static async Task Main(string[] args)
{
var source = new TransformBlock<string, string>(GetMessage, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1, EnsureOrdered = false });
var target = new ActionBlock<string>(Console.WriteLine);
var programDir = Path.GetDirectoryName(System.Reflection.Assembly.GetEntryAssembly().GetName().CodeBase.Replace("file:///", ""));
using var file1 = new StreamWriter(Path.Combine(programDir, "file1.txt"));
using var file2 = new StreamWriter(Path.Combine(programDir, "file2.txt"));
using var file3 = new StreamWriter(Path.Combine(programDir, "file3.txt"));
var fileAction1 = new ActionBlock<string>(file1.WriteLineAsync);
var fileAction2 = new ActionBlock<string>(file2.WriteLineAsync);
var fileAction3 = new ActionBlock<string>(file3.WriteLineAsync);
MessageGot += async (_, e) => await fileAction1.SendAsync(e);
MessageGot += async (_, e) => await fileAction2.SendAsync(e);
MessageGot += async (_, e) => await fileAction3.SendAsync(e);
using (source.LinkTo(target, new DataflowLinkOptions { PropagateCompletion = true }))
{
for (int i = 0; i < 100; i++)
{
await source.SendAsync(i.ToString() + '\t' + new string(charsSets.Select(s => s[random.Next(s.Length)]).ToArray()));
}
source.Complete();
await target.Completion;
}
}
private static async Task<string> GetMessage(string input)
{
int delay = random.Next(25, 6000);
await Task.Delay(delay);
string message = input.ToLowerInvariant() + '\t' + delay.ToString();
MessageGot?.Invoke(null, message);
return message;
}
}
}