Реактивное расширение .NetCore Наблюдение за MainThread - PullRequest
0 голосов
/ 25 июня 2018

Я пытаюсь выполнить много операций в сети параллельно, и я хочу установить таймаут для каждой операции.

Поскольку в Parallel.ForEach нет простой опции тайм-аута, я использую System.Reactive.

это мой код:

public void networkOps(List<MacCpe> source, Action<List<Router>, List<Exception>> onDone) {

var routers = new List<Router>();
var exceptions = new List<Exception>();

Observable.Defer(() => source.ToObservable()) 
    .ObserveOn(Scheduler.CurrentThread)
            .SubscribeOn(Scheduler.Default)
            .SelectMany(it => 
                Observable.Amb(
                            Observable.Start(() => {
                                switch(it.type) {
                                    case AntennaType.type1: {
                                        //network stuff
                                    }
                                    break;

                                    case AntennaType.type2: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type3: {
                                       //network stuff
                                    }
                                    break;

                                    case AntennaType.type4: {
                                      //network stuff
                                    }
                                    break;

                                    default: throw new NullReferenceException("Nothing");
                                }
                            }).Select(_ => true),
                            Observable.Timer(TimeSpan.FromSeconds(60)).Select(_ => false)
                        ),
                        (it, result) => new { it, result }
                    )
                    .Subscribe (
                        x => {
                            Console.WriteLine("checked item number " + x.it.Id);
                        },
                        ex => {
                            Console.WriteLine("error string");
                        }, () => {
                            onDone(routers, exceptions);
                        }
                    );
}

Я использую оператор Observable.Amb для параллельного запуска таймера 60 секунд, который работает как тайм-аут.

Однако, когда я запускаю этот метод, программа немедленно завершает работу, не обращаясь к обратному вызову onDone.

В сети я вижу, что я могу использовать ObserveOnDispatcher для наблюдения в потоке пользовательского интерфейса при выполнении кода блокировки в пуле потоков, но я использую это для ядра dotnet в linux на стороне сервера терминальных приложений.

Как можно наблюдать за «основным потоком» в консольном приложении?

Заранее спасибо за ответы.

1 Ответ

0 голосов
/ 26 июня 2018

Когда вы заменяете Parallel.ForEach, звучит так, как будто вы счастливы, когда выполняете операцию блокировки. Использование Rx в том виде, в котором вы его настроили, не является блокирующей операцией, поэтому метод немедленно завершается.

Это очень просто исправить. Просто измените .Subscribe на это:

.Do(
    x =>
    {
        Console.WriteLine("checked item number " + x.it.Id);
    },
    ex =>
    {
        Console.WriteLine("error string");
    }, () =>
    {
        onDone(routers, exceptions);
    }
)
.Wait();

Я бы также избавился от твоих .ObserveOn(Scheduler.CurrentThread) и .SubscribeOn(Scheduler.Default), пока ты не будешь уверен, что они тебе нужны.

...