Может ли ActionBlock содержать состояние? - PullRequest
1 голос
/ 10 марта 2020

Я пишу приложение, которое использует поток данных TPL. Я пытаюсь настроить блок действий для записи в базу данных.

Однако мне нужен этот блок действий для выполнения шага инициализации первого полученного сообщения (обратите внимание, что я должен дождаться первого сообщения и не могу выполнить инициализацию во время создания блока действий).

Из-за этого моему блоку действий необходимо поддерживать какое-то состояние, указывающее, получил ли оно уже первое сообщение.

Возможно ли для ActionBlock поддерживать состояние?

Ссылка Пример кода Microsoft, приведенный ниже, как бы я go сообщил о добавлении переменной состояния в ActionBlock? Кажется, что он поддерживает только локальные переменные.

// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism,
   int messageCount)
{
   // Create an ActionBlock<int> that performs some work.
   var workerBlock = new ActionBlock<int>(
      // Simulate work by suspending the current thread.
      millisecondsTimeout => Thread.Sleep(millisecondsTimeout),
      // Specify a maximum degree of parallelism.
      new ExecutionDataflowBlockOptions
      {
         MaxDegreeOfParallelism = maxDegreeOfParallelism
      });

   // Compute the time that it takes for several messages to 
   // flow through the dataflow block.

   Stopwatch stopwatch = new Stopwatch();
   stopwatch.Start();

   for (int i = 0; i < messageCount; i++)
   {
      workerBlock.Post(1000);
   }
   workerBlock.Complete();

   // Wait for all messages to propagate through the network.
   workerBlock.Completion.Wait();

   // Stop the timer and return the elapsed number of milliseconds.
   stopwatch.Stop();
   return stopwatch.Elapsed;
}

1 Ответ

2 голосов
/ 12 марта 2020

Вы можете реализовать свой собственный StatefulActionBlock<T>, вот так. В зависимости от вашего MaxDegreeOfParallelism блокировка может не понадобиться (и даже если вы это сделаете, могут быть более эффективные способы обеспечения безопасности потоков). Спасибо @TheodorZoulias за помощь в уточнении этого подхода.

public class StatefulActionBlock<TInput, TState> : IDataflowBlock, ITargetBlock<TInput>
{
   private bool _initialized;

   private Action<TState> _initializer;

   private object _lock = new object();

   private ITargetBlock<TInput> _actionBlock;

   private TState _state;

   public Task Completion => _actionBlock.Completion;

   public StatefulActionBlock(Action<TInput> action, Action<TState> initializer, TState state, ExecutionDataflowBlockOptions options)
   {
       //null checks omitted...

       _initializer = initializer;
       _actionBlock = new ActionBlock<TInput>(action, options);
       _state = state;
   }

   void Initialize()
   {
       _initializer(_state);
       _initialized = true;
   }

   public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept) 
   {
       lock (_lock)
       {
           if (!_initialized)
               Initialize();
       }
       return _actionBlock.OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public void Complete() =>
       _actionBlock.Complete();

   public void Fault(Exception exception) =>
       _actionBlock.Fault(exception);
}

Вы также можете заблокировать и проверить, инициализированы ли вы в вашем действии.

private static object _lock = new Object();
private static bool _isInitialized = false;

// Performs several computations by using dataflow and returns the elapsed
// time required to perform the computations.
static TimeSpan TimeDataflowComputations(int maxDegreeOfParallelism, int messageCount)
{
   // Create an ActionBlock<int> that performs some work.
   var workerBlock = new ActionBlock<int>(
      // Simulate work by suspending the current thread.
      DoStuff,
      // Specify a maximum degree of parallelism.
      new ExecutionDataflowBlockOptions
      {
         MaxDegreeOfParallelism = maxDegreeOfParallelism
      });

   // Compute the time that it takes for several messages to 
   // flow through the dataflow block.

   Stopwatch stopwatch = new Stopwatch();
   stopwatch.Start();

   for (int i = 0; i < messageCount; i++)
   {
      workerBlock.Post(1000);
   }
   workerBlock.Complete();

   // Wait for all messages to propagate through the network.
   workerBlock.Completion.Wait();

   // Stop the timer and return the elapsed number of milliseconds.
   stopwatch.Stop();
   return stopwatch.Elapsed;
}

private static void DoStuff(int i)
{
    lock (_lock)
    {
       if (!_initialized)
       {
          Initialize();
          _initialized = true;
       }
    }
    Thread.Sleep(i); //make a snack
}

private static void Initialize()
{
   //...
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...