Блокировка обработки потока данных TPL - PullRequest
1 голос
/ 04 апреля 2019

Я подписан на канал данных в реальном времени и поддерживаю состояние на основе полученных данных.Обычно все данные принимаются по порядку, но в случае, когда сообщение отбрасывается, я буферизую сообщения, получаю снимок состояния через REST API, а затем воспроизводю буфер, пропуская любые сообщения с идентификатором, предшествующим Id.один указан в снимке.В настоящее время я делаю следующее:

class StateManager
{
  private long _lastId;
  private bool _isSyncing;
  private object _syncLock;

  private Dictionary<decimal,decimal> _state;  
  private ConcurrentQueue<SocketMessage> _messageBuffer;

  private ManualResetEvent _messageEvent;
  private ManualResetEvent _processingEvent;

  public StateManager( DataSocket socket )
  {
    _isSyncing = false;
    _syncLock = new object();

    _state = new Dictionary<decimal,decimal>();
    _messageBuffer = new ConcurrentQueue<SocketMessage>();

    socket.OnMessage += OnSocketMessage;
    Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
  }

  public void ApplySnapshot( Snapshot snapshot )
  {
    lock( _syncLock )
    {
      if( _isSyncing ) return;

      _isSyncing = true;
      _processingEvent.Reset();
    }

    // Apply the snapshot to the state...

    _isSyncing = false;
    _processingEvent.Set();
  }

  private void OnSocketMessage( object sender, SocketMessage msg )
  {
    _messageBuffer.Enqueue( msg );
    _messageEvent.Set();
  }

  private async Task MessageProcessingThread()
  {
    while(true)
    {
      _messageEvent.WaitOne();
      while(true)
      {
        _processingEvent.WaitOne();
        if( !_messageBuffer.TryDequeue( out var msg ) )
        {
          _messageEvent.Reset();
          break;
        }
        ApplyToState( msg );
      }

    }
  }
}

Это отлично работает, но я чувствую, что это немного неаккуратно и может работать лучше при больших нагрузках.Таким образом, я смотрю на переход к Microsoft.Tpl.Dataflow, который будет обрабатывать выполнение очередей и обработку для меня.Тем не менее, я использовал Dataflow раньше, и у меня есть проблема:

Есть ли способ, которым я могу приостановить выполнение ActionBlock так, что он будет буферизовать новые задачи, но не обрабатывать их, пока ярезюме? В случае, когда я обнаруживаю пропущенное сообщение, мне нужно иметь возможность приостановить обработку до применения нового снимка, а затем возобновить и обработать все буферизованные сообщения.

Я мог быпросто используйте _processingEvent внутри ActionBlock, но я чувствую, что это вызовет кучу проблем.Во-первых, это блокировало бы задачу, вызывая запуск большего количества задач, и это блокировало бы, быстро заполняя внутреннюю очередь задач TPL.Кроме того, это приведет к одновременному завершению всех заблокированных задач, возможно, из-за неправильного порядка, что вызовет другое событие повторной синхронизации.

Если это невозможно с TPL, есть ли лучший способ сделать это?это?

...