Объединение нескольких наблюдаемых и обновление существующих подписчиков? - PullRequest
0 голосов
/ 05 июня 2019

Как объединить горячие наблюдаемые, которые могут иметь или не иметь подписчиков, в новую наблюдаемую и продолжать предоставлять все новые данные существующим подписчикам?

В качестве примера представим, что у нас есть такой класс:


class SomeClass
{

  IObservable<string> Actions { get; set; } = Observable.Empty<string>();

  void AddActionCreator(IObservable<string> creator)
  {
    Actions = Actions.Merge(creator);
  }
}

Проблема, с которой я сталкиваюсь, заключается в том, что если AddActionCreator добавляет новый поток действий, то все предыдущие подписчики SomeClass.Actions, которые подписались до объединения этого нового потока, никогда не получат новые действия.

1 Ответ

2 голосов
/ 05 июня 2019

Легко делать то, что ты хочешь. Здесь вам нужны SelectMany и Subject<IObservabe<string>>.

Вот класс, который вам нужен:

public class SomeClass
{
    private Subject<IObservable<string>> _sources = new Subject<System.IObservable<string>>();
    public IObservable<string> Actions { get; private set; } = null;

    public SomeClass()
    {
        this.Actions = _sources.SelectMany(x => x);
    }

    public void AddActionCreator(IObservable<string> creator)
    {
        _sources.OnNext(creator);
    }
}

Теперь вы можете использовать его так:

var sc = new SomeClass();
sc.Actions.Subscribe(x => Console.WriteLine($"1:{x}"));
sc.AddActionCreator(Observable.Return("Hello"));
sc.Actions.Subscribe(x => Console.WriteLine($"2:{x}"));
sc.AddActionCreator(Observable.Range(0, 3).Select(x => $"{x}"));
sc.Actions.Subscribe(x => Console.WriteLine($"3:{x}"));
sc.AddActionCreator(Observable.Return("World"));

Вы получите этот вывод:

1:Hello
1:0
1:1
1:2
2:0
2:1
2:2
1:World
2:World
3:World

Вы можете видеть, что новые наблюдаемые добавляются к существующим подписчикам.

...