Получать параллельные асинхронные запросы и обрабатывать их по одному - PullRequest
0 голосов
/ 05 мая 2018

Фон

У нас есть сервисная операция, которая может принимать одновременные асинхронные запросы и должна обрабатывать эти запросы по одному.

В следующем примере метод UploadAndImport(...) получает параллельные запросы в нескольких потоках, но его вызовы метода ImportFile(...) должны выполняться по одному за раз.

Описание личности

Представьте себе склад со множеством рабочих (несколько потоков). Люди (клиенты) могут отправлять на склад много пакетов (запросов) одновременно (одновременно). Когда посылка приходит в себя, работник берет на себя ответственность за нее от начала до конца, и человек, уронивший посылку, может уйти (запустить и забыть). Задача рабочих состоит в том, чтобы складывать каждую упаковку в небольшой желоб, и только один работник может одновременно положить упаковку в желоб, иначе наступит хаос. Если человек, который бросил пакет, регистрируется позже (конечная точка опроса), склад должен иметь возможность сообщить, прошел пакет по желобу или нет.

Вопрос

Тогда возникает вопрос, как написать служебную операцию, которая ...

  1. может получать параллельные клиентские запросы,
  2. получает и обрабатывает эти запросы в нескольких потоках,
  3. обрабатывает запросы в том же потоке, который получил запрос,
  4. обрабатывает запросы по одному,
  5. - это односторонняя операция «забей и забудь», а
  6. имеет отдельную конечную точку опроса, которая сообщает о завершении запроса.

Мы попробовали следующее и задаемся вопросом:

  1. Существуют ли какие-либо расовые условия, которые мы не рассмотрели?
  2. Есть ли более канонический способ кодирования этого сценария в C # .NET с сервис-ориентированной архитектурой (мы используем WCF)?

Пример: что мы попробовали?

Это сервисный код, который мы попробовали. Это работает, хотя это похоже на что-то вроде хака или клуджа.

static ImportFileInfo _inProgressRequest = null;

static readonly ConcurrentDictionary<Guid, ImportFileInfo> WaitingRequests = 
    new ConcurrentDictionary<Guid, ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request)
{
    // Receive the incoming request
    WaitingRequests.TryAdd(request.OperationId, request);

    while (null != Interlocked.CompareExchange(ref _inProgressRequest, request, null))
    {
        // Wait for any previous processing to complete
        Thread.Sleep(500);
    }

    // Process the incoming request
    ImportFile(request);

    Interlocked.Exchange(ref _inProgressRequest, null);
    WaitingRequests.TryRemove(request.OperationId, out _);
}

public bool UploadAndImportIsComplete(Guid operationId) => 
    !WaitingRequests.ContainsKey(operationId);

Это пример кода клиента.

private static async Task UploadFile(FileInfo fileInfo, ImportFileInfo importFileInfo)
{
    using (var proxy = new Proxy())
    using (var stream = new FileStream(fileInfo.FullName, FileMode.Open, FileAccess.Read))
    {
        importFileInfo.FileByteStream = stream;
        proxy.UploadAndImport(importFileInfo);
    }

    await Task.Run(() => Poller.Poll(timeoutSeconds: 90, intervalSeconds: 1, func: () =>
    {
        using (var proxy = new Proxy())
        {
            return proxy.UploadAndImportIsComplete(importFileInfo.OperationId);
        }
    }));
}

Трудно написать минимальный жизнеспособный пример этого на скрипке, но вот начало , которое дает смысл и которое компилируется.

Как и прежде, вышеприведенное выглядит как хак / клудж, и мы спрашиваем как о потенциальных подводных камнях в его подходе, так и об альтернативных шаблонах, которые являются более подходящими / каноническими.

Ответы [ 3 ]

0 голосов
/ 05 мая 2018

Проблема в том, что ваша общая пропускная способность очень мала - одновременно может выполняться только одно задание - и вы хотите обрабатывать параллельные запросы. Это означает, что время очереди может сильно отличаться. Возможно, это не лучший выбор для реализации вашей очереди заданий в оперативной памяти, поскольку это сделает вашу систему гораздо более хрупкой и ее будет сложнее масштабировать по мере роста вашего бизнеса.

Традиционный, масштабируемый способ создания архитектуры:

  • Служба HTTP для приема запросов, с балансировкой нагрузки / резервной, без состояния сеанса.
  • База данных SQL Server для сохранения запросов в очереди, возвращая постоянный уникальный идентификатор задания.
  • Служба Windows, обрабатывающая очередь, по одному заданию за раз и помечающая задания как завершенные. Рабочий процесс для службы, вероятно, будет однопоточным.

Для этого решения необходимо выбрать веб-сервер. Распространенным выбором является IIS под управлением ASP.NET. На этой платформе каждый запрос гарантированно обрабатывается однопоточным способом (т. Е. Вам не нужно слишком беспокоиться о состоянии гонки), но из-за функции, называемой thread agility , запрос может заканчиваться другим потоком, но в исходном контексте синхронизации, что означает, что вы, вероятно, никогда не заметите, если не будете отлаживать и проверять идентификаторы потоков.

0 голосов
/ 07 мая 2018

Учитывая контекст ограничений нашей системы, это реализация, которую мы в итоге использовали:

static ImportFileInfo _importInProgressItem = null;

static readonly ConcurrentQueue<ImportFileInfo> ImportQueue = 
    new ConcurrentQueue<ImportFileInfo>();

public void UploadAndImport(ImportFileInfo request) {
    UploadFile(request);
    ImportFileSynchronized(request);
}

// Synchronize the file import, 
// because the database allows a user to perform only one write at a time.
private void ImportFileSynchronized(ImportFileInfo request) {
    ImportQueue.Enqueue(request);
    do {
        ImportQueue.TryPeek(out var next);
        if (null != Interlocked.CompareExchange(ref _importInProgressItem, next, null)) {
            // Queue processing is already under way in another thread.
            return;
        }

        ImportFile(next);
        ImportQueue.TryDequeue(out _);
        Interlocked.Exchange(ref _importInProgressItem, null);
    }
    while (ImportQueue.Any());
}

public bool UploadAndImportIsComplete(Guid operationId) =>
    ImportQueue.All(waiting => waiting.OperationId != operationId);

Это решение хорошо работает для ожидаемых нагрузок. Эта загрузка включает в себя максимум 15-20 одновременных загрузок файлов PDF. Пакет из 15-20 файлов, как правило, приходит сразу, а затем замолкает на несколько часов, пока не прибудет следующая партия.

Критика и обратная связь приветствуются.

0 голосов
/ 05 мая 2018

Простое решение, использующее шаблон Producer-Consumer для передачи запросов в случае ограничения числа потоков.

Вам все еще нужно реализовать простой репортер прогресса или событие. Я предлагаю заменить дорогой метод опроса асинхронной связью, предлагаемой библиотекой Microsoft SignalR . Он использует WebSocket для включения асинхронного поведения. Клиент и сервер могут зарегистрировать свои обратные вызовы на концентраторе. Используя RPC, клиент теперь может вызывать методы на стороне сервера и наоборот. Вы будете публиковать информацию о прогрессе на клиенте, используя концентратор (на стороне клиента). По моему опыту, SignalR очень прост в использовании и хорошо документирован. Он имеет библиотеку для всех известных серверных языков (например, Java).

Опрос в моем понимании является полной противоположностью «забей и забудь». Вы не можете забыть, потому что вы должны проверить что-то на основе интервала. Общение на основе событий, такое как SignalR, запускается и забывается, так как вы запускаете и получите напоминание (потому что вы забыли). «Сторона события» будет вызывать ваш обратный вызов вместо вас, ожидающих сделать это самостоятельно!

Требование 5 игнорируется, поскольку у меня нет причин. Ожидание завершения потока уничтожит огонь и забудет персонажа.

private BlockingCollection<ImportFileInfo> requestQueue = new BlockingCollection<ImportFileInfo>();
private bool isServiceEnabled;
private readonly int maxNumberOfThreads = 8;
private Semaphore semaphore = new Semaphore(numberOfThreads);
private readonly object syncLock = new object();

public void UploadAndImport(ImportFileInfo request) 
{            
  // Start the request handler background loop
  if (!this.isServiceEnabled)
  {
    this.requestQueue?.Dispose();
    this.requestQueue = new BlockingCollection<ImportFileInfo>();

    // Fire and forget (requirement 4)
    Task.Run(() => HandleRequests());
    this.isServiceEnabled = true;
  }

  // Cache multiple incoming client requests (requirement 1) (and enable throttling)
  this.requestQueue.Add(request);
}

private void HandleRequests()
{
  while (!this.requestQueue.IsCompleted)
  {
    // Wait while thread limit is exceeded (some throttling)
    this.semaphore.WaitOne();

    // Process the incoming requests in a dedicated thread (requirement 2) until the BlockingCollection is marked completed.
    Task.Run(() => ProcessRequest());
  }

  // Reset the request handler after BlockingCollection was marked completed
  this.isServiceEnabled = false;
  this.requestQueue.Dispose();
}

private void ProcessRequest()
{
  ImportFileInfo request = this.requestQueue.Take();
  UploadFile(request);

  // You updated your question saying the method "ImportFile()" requires synchronization.
  // This a bottleneck and will significantly drop performance, when this method is long running. 
  lock (this.syncLock)
  {
    ImportFile(request);
   }

  this.semaphore.Release();
}

Примечания:

  • BlockingCollection - это IDisposable
  • TODO: Вы должны "закрыть" BlockingCollection, отметив его завершенным: «BlockingCollection.CompleteAdding ()» или он будет неопределенно повторять цикл в ожидании дальнейших запросов. Возможно, вы вводите дополнительные методы запроса для клиента, чтобы отменить и / или обновить процесс и пометить добавление в BlockingCollection как выполненное. Или таймер, который ожидает время простоя, прежде чем пометить его как завершенный. Или сделайте ваш запрос обработчиком потока или спином.
  • Замените Take () и Add (...) на TryTake (...) и TryAdd (...), если вам нужна поддержка отмены
  • Код не тестируется
  • Ваш метод ImportFile () является узким местом в вашей многопоточной среде. Я предлагаю сделать это потокобезопасным. В случае ввода-вывода, который требует синхронизации, я бы кэшировал данные в коллекции BlockingCollection, а затем записывал их по одному в один / один.
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...