GroupJoin с одним окном, заканчивающимся при завершении потоков - PullRequest
0 голосов
/ 14 февраля 2020

У меня есть два источника данных, которые выгружают данные из отдельного потока. Я пытаюсь соединить оба источника ключом. Я могу сделать это с помощью GroupJoin. Я должен использовать Observable.Never, чтобы окно никогда не заканчивалось. Все работает нормально с точки зрения объединения потоков. Когда оба источника завершают сброс данных, они вызывают их соответствующие onComplete на Observers. Я ожидаю завершения потоковой передачи, как только будет получено OnComplete для обоих источников. Поскольку я использовал Observable.Never, поток никогда не заканчивается, и событие Oncomplete моего aggSource никогда не вызывается.

есть ли способ сообщить Rx, что закрывается окно, когда OnComplete получает для обоих источников вместо того, чтобы бесконечно держать его открытым?

Я новичок в Rx и не уверен, что этого можно достичь , Ниже приведен фрагмент кода. Заранее спасибо !!

var l = Source1;
var r = Source2;

var q = r.GroupJoin(l,
                _ => Observable.Never<Unit>(), // windows from each left event going on forever 
                _ => Observable.Never<Unit>(), // windows from each right event going on forever
                (left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events

        var joinSource =   q.SelectMany(e => {
            return e.Item2.Where(
               x =>
               {
                 return x.ID== e.Item1.ID;
               })
               .Select(v=>  (Item1:v.Value, Item2: e.Item1.Value));
        });



var aggSource = joinSource.GroupBy(x => x.Item1).SelectMany(grp =>
            {
                return grp.Scan(0.0, (accumulator, current) => accumulator + current.Item2).Select(z => (Group: grp.Key, Value: z));
            });



aggSource.Subscribe(x => dictResults[x.Group] = x,
              y => { Console.WriteLine("Error Ocurred: " + y.Message); completed = true; },
              () => { completed = true; Console.WriteLine("Subcription comnpleted"); }
              );
// dict results is dictionary which is my projection which is shown to View. Right now my view is just console window.

1 Ответ

1 голос
/ 16 февраля 2020

Селектор длительности контролирует перекрытие окна соединения - которое нам нужно прервать, когда завершится любой из источников. Во-первых, мы будем использовать LastOrDefaultAsync для получения уведомления при выдаче OnComplete.

var either = Observable.CombineLatest(l.LastOrDefaultAsync(), r.LastOrDefaultAsync());

Теперь мы можем изменить селектор длительности из образца:

var q = r.GroupJoin(l,
                _ => Observable.Never<Unit>().TakeUntil(either), // windows from each left event until l or r completes
                _ => Observable.Never<Unit>().TakeUntil(either), // windows from each right event until l or r completes
                (left, obsOfRight) => Tuple.Create(left, obsOfRight)); // create tuple of left event with observable of right events

Это должно вызвать каскад и завершить оставшуюся часть наблюдаемого конвейера.

...