Наблюдаемый, который сочетает в себе поведение groupby с comblatest и демонстрирует поведение "внешнего соединения" - PullRequest
2 голосов
/ 04 апреля 2020

Я пытаюсь использовать реактивную парадигму для создания наблюдаемой, которая действует как комбинация «группа по» и комбинация. У меня есть две исходные наблюдаемые, которые имеют общий ключ объединения, как в следующих двух структурах данных:

class Foo
{
    string Key;
    string Funky;
}

class Bar
{
    string Key;
    string Town;
}

Мне нужна наблюдаемая, которая дает мне последнюю комбинацию этих двух, соединенных на InstrumentID. Конечный результат должен выглядеть примерно так:

class Target
{
   string Key;
   string Funky;
   string Town;
}

и демонстрирует поведение, подобное «внешнему объединению», означающее, что первая последовательность, создающая новый «ключ», даст класс Target с другой стороной, равной нулю, и затем, как только другая сторона также выдаст тот же ключ присоединения, последний с обеих сторон будет получен всякий раз, когда есть новое значение в любой последовательности для данного ключа.

Ответы [ 3 ]

0 голосов
/ 04 апреля 2020

Это не может быть "кошерным" по некоторым стандартам, но работает для того, что мне нужно для этого. Публикация для тех, кто ищет ту же функциональность (NET версия RX)

public static class Extensions
{
    public static IObservable<TResult> CombineLatestGrouped<TFirst,TSecond,TKey, TResult>(
        this IObservable<TFirst> first, 
        IObservable<TSecond> second, 
        Func<TFirst, TKey> firstKeySelector, 
        Func<TSecond, TKey> secondKeySelector, 
        Func<TKey,TFirst,TSecond,TResult> resultSelector)

    {
        var dic = new ConcurrentDictionary<TKey,Tuple<TFirst,TSecond>>();

        return Observable.Create<TResult>(obs =>
        {
            var d1 = first
            .Select(x =>
            {
                var key = firstKeySelector(x);
                var tuple = dic.AddOrUpdate(
                    key,
                    addValueFactory: key => Tuple.Create(x, default(TSecond)),
                    updateValueFactory: (key, existing) => Tuple.Create(x, existing.Item2));
                return resultSelector(key, tuple.Item1, tuple.Item2);
            })
            .Subscribe(obs);

            var d2 = second
            .Select(x =>
            {
                var key = secondKeySelector(x);
                var tuple = dic.AddOrUpdate(
                    key,
                    addValueFactory: key => Tuple.Create(default(TFirst), x),
                    updateValueFactory: (key, existing) => Tuple.Create(existing.Item1, x));
                return resultSelector(key, tuple.Item1, tuple.Item2);
            })
            .Subscribe(obs);

            return new CompositeDisposable(d1, d2);
        });
    }
}
0 голосов
/ 04 апреля 2020

Как я понимаю, вы хотели бы следующее:

новый "ключ" даст класс Target с другой стороной, равной нулю

Если оставить или правая сторона выдает НОВЫЙ ключ (пред .: ноль или другой)

, а затем однажды другая сторона также выдает тот же ключ соединения,

предварительное условие: поток испущен значение - другой поток теперь выдает значение и ключ для левого и правого эквалайзера

последние с обеих сторон выдаются всякий раз, когда есть новое значение в любой последовательности для данного ключа.

испускать полную цель (составленную из левого, правого) каждого левого, правого испускания, когда значение left, right заметно меняется

RxJava2 решение для моего предположения:

  @Test
  void test2() {
    PublishSubject<Foo> foo$ = PublishSubject.create();
    PublishSubject<Bar> bar$ = PublishSubject.create();

    Observable<Target> target$ = Observable.merge(Arrays.asList(foo$, bar$))
        // filter invalid values
        .filter(hasId -> hasId.key() != null)
        .scan(Target.NULL, (prev, change) -> {
          // when prev. target and current value#key are eq -> emit composed value
          if (change.key().equals(prev.key)) {
            return composedTarget(prev, change);
          } else if (change instanceof Foo) {
            return Target.fromFoo((Foo) change);
          } else if (change instanceof Bar) {
            return Target.fromBar((Bar) change);
          }
          return prev;
        }).filter(target -> target != Target.NULL)
        .distinctUntilChanged();

    TestObserver<Target> test = target$.test();

    // emit
    foo$.onNext(new Foo("123", "f1"));
    // emit
    bar$.onNext(new Bar("123", "f2"));
    // emit
    bar$.onNext(new Bar("123", "f3"));
    // skipped
    foo$.onNext(new Foo("123", "f1"));
    // emit
    foo$.onNext(new Foo("123", "f5"));
    // emit
    foo$.onNext(new Foo("key", "value"));
    // emit
    foo$.onNext(new Foo("key2", "value2"));
    // emit
    bar$.onNext(new Bar("bar2", "Berlin"));
    // emit
    foo$.onNext(new Foo("foo2", "Funkeey"));

    test.assertValues(
        new Target("123", "f1", null),
        new Target("123", "f1", "f2"),
        new Target("123", "f1", "f3"),
        new Target("123", "f5", "f3"),
        new Target("key", "value", null),
        new Target("key2", "value2", null),
        new Target("bar2", null, "Berlin"),
        new Target("foo2", "Funkeey", null)
    );
  }

  private Target composedTarget(Target prev, HasId change) {
    if (change instanceof Foo) {
      Foo foo = (Foo) change;
      return new Target(prev.key, foo.funky, prev.town);
    }
    if (change instanceof Bar) {
      Bar bar = (Bar) change;
      return new Target(prev.key, prev.funky, bar.town);
    }
    return prev;
  }

Domain-Classes

  interface HasId {
    String key();
  }

  static final class Foo implements HasId {
    final String key;
    final String funky;

    Foo(String key, String funky) {
      this.key = key;
      this.funky = funky;
    }

    @Override
    public String key() {
      return key;
    }
  }

  static final class Bar implements HasId {
    String key;
    String town;

    Bar(String key, String town) {
      this.key = key;
      this.town = town;
    }

    @Override
    public String key() {
      return key;
    }
  }

  static final class Target {
    private static final Target NULL = new Target(null, null, null);
    final String key;
    final String funky;
    final String town;

    Target(String key, String funky, String town) {
      this.key = key;
      this.funky = funky;
      this.town = town;
    }

    static Target fromFoo(Foo foo) {
      return new Target(foo.key, foo.funky, null);
    }

    static Target fromBar(Bar bar) {
      return new Target(bar.key, null, bar.town);
    }

    @Override
    public boolean equals(Object o) {
      if (this == o) return true;
      if (o == null || getClass() != o.getClass()) return false;
      Target target = (Target) o;
      return key.equals(target.key) &&
          Objects.equals(funky, target.funky) &&
          Objects.equals(town, target.town);
    }

    @Override
    public int hashCode() {
      return Objects.hash(key, funky, town);
    }

    @Override
    public String toString() {
      return "Target{" +
          "key='" + key + '\'' +
          ", funky='" + funky + '\'' +
          ", town='" + town + '\'' +
          '}';
    }
  }

Пожалуйста, исправьте мои предположения, если я ошибаюсь. Решение может быть реализовано лучше в C# с сопоставлением с шаблоном. На самом деле, если C# имеет типы объединения, такие как F #, это было бы лучше.

0 голосов
/ 04 апреля 2020

Допустим, ваш поток foo$ выдает значения типа Foo, а поток bar$ выдает значения Bar. Вот как вы можете их объединить:

combineLatest([
  foo$,
  bar$
    // use startWith(null) to ensure combineLatest will emit as soon as foo$ emits, not waiting for bar$ to emit its first value
    .pipe(startWith(null))
]).pipe(
  map(([foo, bar]) => ({
    // always keep all properties from foo
    ...foo,
    // only add properties from bar if it has the matching Key
    ...(bar && bar.Key === foo.Key ? bar : null)
  }))
)
...