Есть ли лучший, возможно, более надежный шаблон, чем «запускать и забывать» для одновременной обработки переменного числа асинхронных задач? - PullRequest
3 голосов
/ 01 мая 2020

У меня следующий код:

while (!cancellationToken.IsCancellationRequested)
{
    var connection = await listener.AcceptAsync(cancellationToken);

    HandleConnectionAsync(connection, cancellationToken)
        .FireAndForget(HandleException);
}

FireAndForget - это метод расширения:

public static async void FireAndForget(this ValueTask task, Action<Exception> exceptionHandler)
{
    try
    {
        await task.ConfigureAwait(false);
    }
    catch (Exception e)
    {
        exceptionHandler.Invoke(e);
    }
}

while loop - это жизненный цикл сервера. Когда новое соединение принято, оно запускает некоторую «фоновую задачу», чтобы оно могло обработать это новое соединение, а затем while loop возвращается к принятию новых соединений, ничего не ожидая - пауза жизненного цикла.

Я не могу await HandleConnectionAsync (приостановить жизненный цикл) здесь, потому что я хочу немедленно принять другое соединение (если оно есть) и иметь возможность обрабатывать несколько соединений одновременно. HandleConnectionAsync привязан к вводу / выводу и обрабатывает одно соединение за раз до закрытия (задача завершается через некоторое время).

Соединения должны обрабатываться отдельно - я не хочу, чтобы возникала ситуация, когда какая-то ошибка в то время как обработка одного соединения оказывает какое-либо влияние на другие соединения.

Решение, которое я здесь использую, «срабатывай и забывай» работает, но общее правило - всегда await асинхронные методы и никогда не использовать async void.

Кажется, что я нарушил правила, так что есть ли лучший, возможно, более надежный способ обрабатывать переменное (количество задач изменяется во времени) одновременно число задач, связанных с асинхронным вводом / выводом, в описанной ситуации здесь?

Дополнительная информация:

  • Каждый вызов AcceptAsync выделяет системные ресурсы даже перед возвратом соединения, и я хочу по возможности избегать этого (соединение не может быть возвращено для часы (код может «ждать» несколько часов) - пока какой-нибудь внешний клиент не решит подключиться к моему серверу). Лучше предположить, что это метод, который я не хочу вызывать одновременно / параллельно - достаточно одного AcceptAsync за один раз
  • Пожалуйста, примите во внимание, что я могу иметь миллионы клиентов на день подключения и отключения к моему серверу и серверу (while loop) может работать много-много дней
  • Я не знаю, сколько подключений мне нужно будет обработать в указанное c время
  • Я знаю максимальное количество соединений, которые моя программа сможет обрабатывать одновременно
  • Если я достигну предела maximum number of connections, AcceptAsync не вернет новое соединение, пока не закроется какое-то другое активное соединение, поэтому Мне не нужно беспокоиться об этом, но любое решение, основанное на этом ограничении, должно учитывать, что активные соединения могут быть закрыты, и мне все еще нужно обрабатывать новые соединения - количество соединений меняется со временем. У «запускай и забывай» проблем с этим нет
  • Код для HandleConnectionAsync не имеет значения - он просто обрабатывает одно соединение за раз, пока не закроется (задача завершится через некоторое время) и привязан к вводу / выводу (HandleConnectionAsync обрабатывает одно соединение за раз, но, конечно, мы можем запускать несколько HandleConnectionAsync задач для одновременной обработки нескольких соединений - это то, что я делал с помощью «запускай и забывай»)

Ответы [ 3 ]

4 голосов
/ 01 мая 2020

Я предполагаю, что переход на что-то вроде SignalR не является приемлемым решением. Это было бы моей первой рекомендацией.

Пользовательские сокеты сервера - это сценарий, в котором допускается какое-то «увольнение». Я рассматриваю возможность добавления типа AsyncEx типа «Диспетчер задач», чтобы упростить решение такого рода, но пока не сделал этого.

Суть в том, что вам нужно управлять списком соединений сами. Объект «соединения» может включать Task, который представляет цикл обработки; хорошо. Также полезно (особенно для целей отладки или управления) иметь и другие свойства, такие как удаленный IP-адрес.

Так что я бы подошел к этому примерно так:

private readonly object _mutex = new object();
private readonly List<State> _connections = new List<State>();

private void Add(State state)
{
  lock (_mutex)
    _connections.Add(state);
}
private void Remove(State state)
{
  lock (_mutex)
    _connections.Remove(state);
}

public async Task RunAsync(CancellationToken cancellationToken)
{
  while (true)
  {
    var connection = await listener.AcceptAsync(cancellationToken);

    Add(new State(this, connection));
  }
}

private sealed class State
{
  private readonly Parent _parent;

  public State(Parent parent, Connection connection, CancellationToken cancellationToken)
  {
    _parent = parent;
    Task = ExecuteAsync(connection, cancellationToken);
  }

  private static async Task ExecuteAsync(Connection connection, CancellationToken cancellationToken)
  {
    try { await HandleConnectionAsync(connection, cancellationToken); }
    finally { _parent.Remove(this); }
  }

  public Task Task { get; }
  // other properties as desired, e.g., RemoteAddress
}

Теперь у вас есть коллекция соединений. Вы можете либо игнорировать задачи в объектах State (как это делается в приведенном выше коде), что похоже на запуск и забывание. Или вы можете await их всех в какой-то момент. Например:

public async Task RunAsync(CancellationToken cancellationToken)
{
  try
  {
    while (true)
    {
      var connection = await listener.AcceptAsync(cancellationToken);

      Add(new State(this, connection));
    }
  }
  catch (OperationCanceledException)
  {
    // Wait for all connections to cancel.
    // I'm not really sure why you would *want* to do this, though.
    List<State> connections;
    lock (_mutex) { connections = _connections.ToList(); }
    await Task.WhenAll(connections.Select(x => x.Task));
  }
}

Затем можно легко расширить объект State, чтобы вы могли выполнять действия, которые иногда полезны для серверного приложения, например:

  • Список всех удаленные адреса, к которым подключен этот сервер.
  • Подождите, пока не будет установлено указанное c соединение.
  • ...

Примечания:

  • Используйте один шаблон для отмены. Передача токена приведет к OperationCanceledException, что является нормальным шаблоном отмены. Код также ранее делал while (!IsCancellationRequested), что приводило к успешному завершению при отмене, что не является нормальным шаблоном отмены. Поэтому я удалил это, чтобы код больше не использовал два шаблона отмены.
  • При работе с необработанными сокетами, в общем случае, вы должны постоянно читать (даже когда вы запись) и периодически запись (даже если у вас нет данных для отправки). Таким образом, ваш HandleConnectionAsync должен запустить асинхронный считыватель и записывающее устройство, а затем использовать Task.WhenAll.
  • Я удалил вызов на HandleException, потому что (вероятно) все, что он делает, должно обрабатываться State.ExecuteAsync. Нетрудно добавить его обратно в случае необходимости.
2 голосов
/ 01 мая 2020

Если существует ограничение на максимальное количество разрешенных одновременных задач, вы должны использовать SemaphoreSlim:

int allowedConcurrent = //..
var semaphore = new SemaphoreSlim(allowedConcurrent);
var tasks = new List<Task>();

while (!cancellationToken.IsCancellationRequested)
{
    Func<Task> func = async () =>
    {
        var connection = await listener.AcceptAsync(cancellationToken);
        await HandleConnectionAsync(connection, cancellationToken);

        semaphore.Release();
    };

    await semaphore.WaitAsync(); // Will return immediately if the number of concurrent tasks does not exceed allowed

    tasks.Add(func());
}

await Task.WhenAll(tasks);

. Это накапливает задачи в списке, тогда Task.WhenAll может их ждать все для завершения.

0 голосов
/ 01 мая 2020

Перво-наперво:

  1. Не делайте асин c void ...

Тогда вы можете реализовать шаблон производителя / потребителя для этого, приведенный ниже псевдокод - это просто руководство, вам нужно убедиться, что ваш потребитель является синглтоном в вашем приложении

public class Data
{
    public Uri Url { get; set; }
}

public class Producer
{
    private Consumer _consumer = new Consumer();
    public void DoStuff()
    {
        var data = new Data();
        _consumer.Enqueue(data);
    }
}

public class Consumer
{
    private readonly List<Data> _toDo = new List<Data>();
    private bool _stop = false;
    public Consumer()
    {
        Task.Factory.StartNew(Loop);
    }

    private async Task Loop()
    {
        while (!_stop)
        {
            Data toDo = null;
            lock (_toDo)
            {
                if (_toDo.Any())
                {
                    toDo = _toDo.First();
                    _toDo.RemoveAt(0);
                }
            }

            if (toDo != null)
            {
                await DoSomething(toDo);
            }

            Thread.Sleep(TimeSpan.FromSeconds(1));
        }
    }

    private async Task DoSomething(Data toDo)
    {
        // YOUR ASYNC STUFF HERE
    }

    public void Enqueue(Data data)
    {
        lock (_toDo)
        {
            _toDo.Add(data);
        }
    }
}

Итак, ваш метод вызова производит то, что вам нужно для выполнения фоновой задачи, а потребитель выполняет это, это другое Огонь и забудь. Также следует учитывать, что произойдет, если что-то пойдет не так на уровне приложения, если вы сохраните данные в Consumer.Enqueue (), чтобы при повторном запуске приложения можно было выполнить недостающую работу ... Надеюсь, это поможет

...