.Net RX: отслеживание прогресса параллельного выполнения - PullRequest
3 голосов
/ 01 января 2012

Мне нужно выполнить несколько длительных операций параллельно, и я хотел бы каким-то образом сообщить о прогрессе. Из моего первоначального исследования кажется, что IObservable вписывается в эту модель. Идея состоит в том, что я вызываю метод, который возвращает IObservable из int, где int сообщается о завершении в процентах, параллельное выполнение начинается сразу после выхода из метода, эта наблюдаемая должна быть горячей наблюдаемой, чтобы все подписчики изучали одну и ту же информацию о ходе выполнения в определенный момент времени. например, Поздний подписчик может только узнать, что все выполнение завершено, и больше нет отслеживания прогресса.

Ближайший подход к этой проблеме, который я обнаружил, заключается в использовании Observable.ForkJoin и Observable.Start, но я не могу понять, как сделать их единственной наблюдаемой, которую я могу вернуть из метода.

Пожалуйста, поделитесь своими идеями о том, как этого достичь, или, возможно, есть другой подход к этой проблеме с использованием .Net RX.

Ответы [ 2 ]

3 голосов
/ 02 января 2012

Чтобы сделать горячую наблюдаемость, я бы, вероятно, начал с метода, который использует BehaviorSubject в качестве возвращаемого значения и способ выполнения операций.Если вы просто хотите пример, пропустите до конца.Остальная часть этого ответа объясняет шаги.

Для этого ответа я буду предполагать, что ваши длительные операции не имеют своего собственного способа вызова асинхронно.Если они это сделают, следующий шаг может быть немного другим.Следующее, что нужно сделать, это отправить работу в другой поток, используя IScheduler.Вы можете разрешить вызывающей стороне выбирать, где происходит работа, создавая перегрузку, которая при желании принимает планировщик в качестве параметра (в этом случае перегрузка, которая не выберет планировщик по умолчанию).Существует довольно много перегрузок IScheduler.Scheduler, некоторые из которых являются методами расширений, поэтому вы должны просмотреть их, чтобы увидеть, какой из них наиболее подходит для вашей ситуации;Я использую на это занимает всего Action здесь.Если у вас есть несколько операций, которые все могут выполняться параллельно, вы можете вызывать scheduler.Schedule несколько раз.

Самая сложная часть этого, вероятно, будет заключаться в определении хода выполнения в любой заданной точке.Если вы выполняете несколько операций одновременно, вам, вероятно, нужно будет отслеживать, сколько из них выполнено, чтобы знать, каков текущий прогресс.С предоставленной вами информацией я не могу быть более точной, чем эта.

Наконец, если ваши операции можно отменить, вы можете принять CancellationToken в качестве параметра.Вы можете использовать это, чтобы отменить операцию, пока она находится в очереди планировщика до ее запуска.Если вы правильно написали свой код операции, он также может использовать токен для отмены.

IObservable<int> DoStuff(/*args*/, 
                         CancellationToken cancel,
                         IScheduler scheduler)
{
    BehaviorSubject<int> progress;
    //if you don't take it as a parameter, pick a scheduler
    //IScheduler scheduler = Scheduler.ThreadPool;

    var disp = scheduler.Schedule(() =>
    {
        //do stuff that needs to run on another thread

        //report progres
        porgress.OnNext(25);
    });
    var disp2 = scheduler.Schedule(...);

    //if the operation is cancelled before the scheduler has started it,
    //you need to dispose the return from the Schedule calls
    var allOps = new CompositeDisposable(disp, disp2);
    cancel.Register(allOps.Dispose);

    return progress;
}
0 голосов
/ 01 января 2012

Вот один подход

// setup a method to do some work, 
// and report it's own partial progress
Func<string, IObservable<int>> doPartialWork = 
    (arg) => Observable.Create<int>(obsvr => {
        return Scheduler.TaskPool.Schedule(arg,(sched,state) => {
            var progress = 0;
            var cancel = new BooleanDisposable();
            while(progress < 10 && !cancel.IsDisposed)
            {
                // do work with arg
                Thread.Sleep(550);
                obsvr.OnNext(1); //report progress
                progress++;
            }
            obsvr.OnCompleted();
            return cancel;
        });
    });

var myArgs = new[]{"Arg1", "Arg2", "Arg3"};

// run all the partial bits of work
// use SelectMany to get a flat stream of 
// partial progress notifications
var xsOfPartialProgress =  
        myArgs.ToObservable(Scheduler.NewThread)
              .SelectMany(arg => doPartialWork(arg))
                  .Replay().RefCount();

// use Scan to get a running aggreggation of progress
var xsProgress = xsOfPartialProgress
                   .Scan(0d, (prog,nextPartial)  
                            => prog + (nextPartial/(myArgs.Length*10d)));
...