Я подписан на канал данных в реальном времени и поддерживаю состояние на основе полученных данных.Обычно все данные принимаются по порядку, но в случае, когда сообщение отбрасывается, я буферизую сообщения, получаю снимок состояния через REST API, а затем воспроизводю буфер, пропуская любые сообщения с идентификатором, предшествующим Id.один указан в снимке.В настоящее время я делаю следующее:
class StateManager
{
private long _lastId;
private bool _isSyncing;
private object _syncLock;
private Dictionary<decimal,decimal> _state;
private ConcurrentQueue<SocketMessage> _messageBuffer;
private ManualResetEvent _messageEvent;
private ManualResetEvent _processingEvent;
public StateManager( DataSocket socket )
{
_isSyncing = false;
_syncLock = new object();
_state = new Dictionary<decimal,decimal>();
_messageBuffer = new ConcurrentQueue<SocketMessage>();
socket.OnMessage += OnSocketMessage;
Task.Factory.StartNew( MessageProcessingThread, TaskCreationOptions.LongRunning );
}
public void ApplySnapshot( Snapshot snapshot )
{
lock( _syncLock )
{
if( _isSyncing ) return;
_isSyncing = true;
_processingEvent.Reset();
}
// Apply the snapshot to the state...
_isSyncing = false;
_processingEvent.Set();
}
private void OnSocketMessage( object sender, SocketMessage msg )
{
_messageBuffer.Enqueue( msg );
_messageEvent.Set();
}
private async Task MessageProcessingThread()
{
while(true)
{
_messageEvent.WaitOne();
while(true)
{
_processingEvent.WaitOne();
if( !_messageBuffer.TryDequeue( out var msg ) )
{
_messageEvent.Reset();
break;
}
ApplyToState( msg );
}
}
}
}
Это отлично работает, но я чувствую, что это немного неаккуратно и может работать лучше при больших нагрузках.Таким образом, я смотрю на переход к Microsoft.Tpl.Dataflow
, который будет обрабатывать выполнение очередей и обработку для меня.Тем не менее, я использовал Dataflow раньше, и у меня есть проблема:
Есть ли способ, которым я могу приостановить выполнение ActionBlock
так, что он будет буферизовать новые задачи, но не обрабатывать их, пока ярезюме? В случае, когда я обнаруживаю пропущенное сообщение, мне нужно иметь возможность приостановить обработку до применения нового снимка, а затем возобновить и обработать все буферизованные сообщения.
Я мог быпросто используйте _processingEvent
внутри ActionBlock
, но я чувствую, что это вызовет кучу проблем.Во-первых, это блокировало бы задачу, вызывая запуск большего количества задач, и это блокировало бы, быстро заполняя внутреннюю очередь задач TPL.Кроме того, это приведет к одновременному завершению всех заблокированных задач, возможно, из-за неправильного порядка, что вызовет другое событие повторной синхронизации.
Если это невозможно с TPL, есть ли лучший способ сделать это?это?