Использование Rx для постановки в очередь операций, которые я не хочу выполнять до определенного времени? - PullRequest
3 голосов
/ 25 августа 2011

Резюме: у меня есть веб-приложение, которое выполняет рабочие процессы на бизнес-объектах, и иногда мне нужно намеренно ждать несколько секунд или минут между шагами.Я ищу (возможно, через Rx.NET), чтобы улучшить выполнение этих рабочих процессов, чтобы я не исчерпал ThreadPool и сделал веб-сайт не отвечающим, когда система находится под большой нагрузкой.

Очень упрощенная версиярабочий процесс:

  1. Создание объекта
  2. Загрузка данных в него из системы A
  3. POST эти данные в систему B

Если система A не работает, мое приложение ждет и повторяет попытку позже.Время ожидания смоделировано после увеличивающихся задержек повторения в GMail: подождите 1 секунду, удваивайте при каждой последующей повторной попытке (максимально через 1 час).Приложение одержимо сохраняет состояние в базе данных, поэтому, если все приложение взорвется, при перезапуске оно возобновит все рабочие процессы, с которых оно остановилось.

В настоящее время (пожалуйста, будьте осторожны) каждый шаг в рабочем процессе выполняется путем вызоваThreadPool.QueueUserWorkItem, чтобы поставить в очередь метод, который вызывает Thread.Sleep, если необходимо для задержки повторения, описанной выше, а затем фактически выполняет шаг.

Если система работает хорошо (без ошибок), она может легко обработать всетрафик, который мы добавляем, и ThreadPool прекрасно управляет параллельным выполнением всех этих экземпляров рабочего процесса.Но если система B некоторое время не работает, число повторных попыток и, соответственно, задержка возрастают, и довольно скоро ThreadPool заполняется всеми спящими потоками, в результате чего веб-сайт перестает отвечать на новые запросы.

По сути, я хочувыбросить все эти ожидающие рабочие процессы в очередь, упорядоченную по (время последнего выполнения + желаемая задержка повторных попыток).Несмотря на то, что я много читал о Rx и восхищался им, у меня никогда не было возможности его использовать, но, похоже, это может быть полезным способом справиться с этим.Если Rx может волшебным образом управлять выплевыванием этих объектов, когда они готовы к стрельбе, кажется, что это

  1. значительно упростит и прояснит эту логику и
  2. предотвратит расточительное использование лотовпотоков, которые просто спят 99% времени

Будем весьма благодарны за любые указания новичку в Rx, даже если это просто объяснение того, почему это на самом деле не очень хороший вариант использования для Rx.

Ответы [ 3 ]

4 голосов
/ 26 августа 2011

В этом случае я мог бы придерживаться вашего текущего решения из-за этого бита:

Приложение одержимо сохраняет состояние в базе данных, поэтому, если все приложение взорвется, то при перезапуске оно будетвозобновить все рабочие процессы, где он остановился.

"Возобновить" конвейер (т. е. x.Where().Select().Timeout().Bla()) с помощью десериализации при запуске - сложно ...

Трудно дать вамБолее подробное решение без дополнительной информации, на самом деле оно может работать довольно хорошо с Rx, если вы не пытаетесь смоделировать весь поток, только бит транзакции (т. е. загрузить из A, отправить в B).

В любом случае, способ решения проблемы исчерпания пула потоков - через класс System.Threading.Timer, который сообщает пулу потоков просто дождаться истечения времени ожидания, прежде чем ставить в очередь новый элемент.

2 голосов
/ 26 августа 2011

Вам определенно придется адаптировать:

public IDisposable StartProcess<T>(Action<T> load, Action<T> post) where T : new()
{
    return StartProcess(TimeSpan.FromSeconds(1), new T())
                .Do(load)
                .Subscribe(post);
}

private IObservable<long> StartProcess<T>(TimeSpan span, T obj) where T : new()
{
    Observable
        .Interval(span)
        .OnErrorResumeNext(Observable.Defer(() => StartProcess(IncreaseSpan(span), obj)))
        .Concat(Observable.Defer(() => StartProcess(TimeSpan.FromSeconds(1), new T())));
}

private TimeSpan IncreaseSpan(TimeSpan span)
{
    return TimeSpan.FromSeconds(span.TotalSeconds < 1800? span.TotalSeconds * 2 : 3600);
}

Теперь я бы предпочел создать экземпляр загрузки и заполнить объект, а не делать это явно, поскольку функциональное программирование препятствует изменчивости, и вы можете пожелать loadна самом деле перейдите в базу данных и восстановите состояние, как вы упомянули.

Я не был уверен, хотите ли вы сохранить объект состояния в случае сбоя вызова post или load, и вам потребуетсяадаптироваться, потому что в настоящее время он сохранит состояние при сбоях load или post и снова вызовет load без свежего состояния, если произойдет сбой post, что, безусловно, не то, что вы хотите сделать.

Я не тестировал код, но Rx подходит для того, что вы хотите сделать.

1 голос
/ 26 августа 2011

Проверьте этот пост на форумах Rx. Довольно удобный оператор для задачи, которую вы хотите решить: http://social.msdn.microsoft.com/Forums/en-US/rx/thread/af43b14e-fb00-42d4-8fb1-5c45862f7796/

Rx - отличный способ справиться с такими проблемами (и в частности), потому что вы можете иметь свои асинхронные функции / наблюдаемые и применять к ним универсальные операторы, такие как описанный оператор Retry.

...