На самом деле я делаю нечто подобное в другом проекте.Что я узнал или сделал бы по-другому:
Прежде всего, лучше использовать выделенные потоки для цикла чтения / записи (с new Thread(ParameterizedThreadStart)
), потому что Task.Run
используетпоток пула, и когда вы используете его в (почти) бесконечном цикле, поток практически никогда не возвращается в пул.
var thread = new Thread(ReaderLoop) { Name = nameof(ReaderLoop) }; // priority, etc if needed
thread.Start(cancellationToken);
Ваш Process
может быть событием, которое выможет вызываться асинхронно, поэтому ваш цикл чтения может быть немедленно возвращен для обработки новых входящих пакетов как можно быстрее:
private void ReaderLoop(object state)
{
var token = (CancellationToken)state;
while (!token.IsCancellationRequested)
{
try
{
var message = MessageQueue.Take(token);
OnMessageReceived(new MessageReceivedEventArgs(message));
}
catch (OperationCanceledException)
{
if (!disposed && IsRunning)
Stop();
break;
}
}
}
Обратите внимание, что если у делегата несколько целей, то это асинхронный вызовне тривиально.Я создал этот метод расширения для вызова делегата в потоках пула:
public static void InvokeAsync<TEventArgs>(this EventHandler<TEventArgs> eventHandler, object sender, TEventArgs args)
{
void Callback(IAsyncResult ar)
{
var method = (EventHandler<TEventArgs>)ar.AsyncState;
try
{
method.EndInvoke(ar);
}
catch (Exception e)
{
HandleError(e, method);
}
}
foreach (EventHandler<TEventArgs> handler in eventHandler.GetInvocationList())
handler.BeginInvoke(sender, args, Callback, handler);
}
Таким образом, реализация OnMessageReceived
может быть:
protected virtual void OnMessageReceived(MessageReceivedEventArgs e)
=> messageReceivedHandler.InvokeAsync(this, e);
Наконец это был большой урок, что у BlockingCollection<T>
есть некоторые проблемы с производительностью.Он использует SpinWait
внутри, чей метод SpinOnce
ждет все дольше и дольше, если в течение длительного времени нет входящих данных.Это сложная проблема, потому что даже если вы зарегистрируете каждый отдельный шаг обработки, вы не заметите, что все запускается с задержкой, если только вы не можете посмеяться над серверной стороной. Здесь вы можете найти быструю реализацию BlockingCollection
, использующую AutoResetEvent
для запуска входящих данных.Я добавил к нему перегрузку Take(CancellationToken)
следующим образом:
/// <summary>
/// Takes an item from the <see cref="FastBlockingCollection{T}"/>
/// </summary>
public T Take(CancellationToken token)
{
T item;
while (!queue.TryDequeue(out item))
{
waitHandle.WaitOne(cancellationCheckTimeout); // can be 10-100 ms
token.ThrowIfCancellationRequested();
}
return item;
}
По сути, все.Может быть, не все применимо в вашем случае, например.если почти немедленный ответ не имеет решающего значения, обычный BlockingCollection
также сделает это.