RX: Как ждать подписчиков до конца? - PullRequest
1 голос
/ 06 мая 2019

У меня есть производитель, который загружает данные на страницах из Rest API, и несколько потребителей, которые обрабатывают страницы (например, загружают их в базу данных).

Я бы хотел, чтобы производитель и потребители работали параллельно,Это означает, что производитель не должен ждать, пока страница будет использована, прежде чем загружать следующую.Каждый потребитель должен обрабатывать страницы последовательно.

Когда все страницы загружены, основной поток должен ждать, пока все потребители завершат свою работу (поскольку потребление может занять больше времени, чем производство).

Мой текущийПодход заключается в следующем:

Я создал заметку, которая загружает страницы, которая начинается, как только присоединяются подписчики.Я настроил подписчиков для наблюдения в их собственных потоках для параллельного выполнения.

Код на C #:

IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume1(page));

observable
   .ObserveOn(NewThreadScheduler.Default)
   .Subscribe(page => consume2(page));

observable.Connect();

Проблема с этой реализацией состоит в том, что основной поток может завершить работу до того, как будут обработаны все страницы.и приложение останавливается.

Как мне добиться этого с помощью RX?

Спасибо!

Редактировать:

Пробовал также следующий подход (из ответа):

static void Main(string[] args)
{
    var getPages = Enumerable.Range(0, 10);

    var els1 = new EventLoopScheduler();
    var els2 = new EventLoopScheduler();

    var observable =
        getPages
            .ToObservable()
            .Publish(ps =>
                Observable
                    .Merge(
                        ps.Select(p => Observable.Start(() => consume1(p), els1)),
                        ps.Select(p => Observable.Start(() => consume2(p), els2))));

    observable.Wait();
}

public static void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public static void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

observable.Wait () возвращается, как только нижележащий перечисляемый завершает получение значений.Выходные данные:

1:0
2:0

Просто чтобы доказать, что если мы заменим getPages на:

var getPages = Enumerable.Range(0, 10)
    .Select(i =>
    {
        Console.WriteLine($"Produced {i}");
        Thread.Sleep(30);
        return i;
    });

, то получим:

Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9

1 Ответ

2 голосов
/ 06 мая 2019

Я думаю, что это то, что вы хотите:

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

Я написал этот тестовый код:

var getPages = Enumerable.Range(0, 10);

var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();

var observable =
    getPages
        .ToObservable()
        .Publish(ps =>
            Observable
                .Merge(
                    ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
                    ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));

observable.Wait();

public void consume1(int p)
{
    Console.WriteLine($"1:{p}");
    Thread.Sleep(200);
}

public void consume2(int p)
{
    Console.WriteLine($"2:{p}");
    Thread.Sleep(100);
}

Я получил этот вывод:

1:0
2:0
2:1
1:1
2:2
2:3
1:2
2:4
2:5
1:3
2:6
2:7
1:4
2:8
2:9
1:5
1:6
1:7
1:8
1:9

Когда вы закончили с EventLoopScheduler экземплярами, вы должны вызвать .Dispose() для них, чтобы закрыть потоки.

...