Как разбить наблюдаемый поток на куски, зависящие от второго потока? - PullRequest
9 голосов
/ 12 января 2012

Я думал, что это легко, но мой мозг сейчас тает ..

Проблема

Учитывая следующее IObservable<int> Поток: 1 1 0 0 0 1 0 0 1 0 1

Я хочу разделить его на IObservable<IEnumerable<int>> Поток вида

1

1 0 0 0

1 0 0

1 0

1

поэтому, когда есть 0, он просто добавляется в IEnumerable, а когда появляется 1, запускается новый список; Это немного более точное определение моей реальной проблемы.

Мой подход пока

Я подумал, что хорошим решением было бы сначала преобразовать его в IObservable<IObservable<int>> с помощью метода Window, а затем использовать ToEnumerable, но почему-то у меня не получается работать ... Я использовал Zip и Skip(1), чтобы получить diff до последнего элемента, я тоже использовал DistinctUntilChanged(). Я избавляю вас от всех вариантов, которые я пробовал ...

Вероятно, самым близким я был этот код:

int[] ints = new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 };
var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Take(11).Select(i => ints[i]);

Subject<int> subject = new Subject<int>();
observable.Subscribe(subject);

var observableDiff = subject.Skip(1).Zip(subject, (n, p) => new { Previous = p, Next = n });
var windows = observable.Window(() => observableDiff.Where(x => x.Next == 1));

int index = 0;
windows.Subscribe(window =>
{
  Console.WriteLine(string.Format("new window [{0}] ", index++));
  window.Subscribe(number => Console.WriteLine(number));
});

Это дает хорошие результаты, но, к сожалению, в конце вылетает.

new window [0]
1
new window [1]
1
0
0
0
new window [2]
1
0
0
new window [3]
1
0
new window [4]
new window [5]
new window [6]
new window [7]
new window [8]
new window [9]
<-- it goes on here until window ~ [80] with a stackoverflow exception

Если бы этой ошибки в моем коде не было, я бы ее достиг ...

Любая помощь будет принята с благодарностью. :)

Редактировать: я использую Rx-Experimental, но это не имеет значения (проверено с LinqPad). Также удалил тему, она не повлияла ни на что. Кажется, с моим новым подходом (Edit2) вам нужен предмет, иначе начало окон будет совершенно странным.

Edit2: немного изменил проблему, чтобы лучше выделить мою проблему, извините. Также обновил мое решение.

Ответы [ 3 ]

12 голосов
/ 13 января 2012

Это сработало для меня:

var ints = (new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 }).ToObservable();

var result =
    ints
        .Publish(ns =>
            ns
                .Where(n => n == 1)
                .Select(n =>
                    ns.TakeWhile(m => m == 0).StartWith(n).ToArray())
        ).Merge();

Я использовал Publish, чтобы убедиться, что наблюдаемая ints рассматривается как "горячая", а не "холодная".

Мои результаты выглядят так:

Grouped ints

2 голосов
/ 13 января 2012

Встроенный Buffer кажется довольно близко к тому, что вам нужно. Промежуточная подписка между источником и вызовом Buffer позволит вам получить наблюдаемые значения закрытия, необходимые для Buffer.

IObservable<IList<T>> Buffer<T>(IObservable<T> source, 
                                Func<T, bool> startNew)
{
    return Observable.Create<IList<T>>(
        obs =>
        {
            var starts = new Subject<Unit>();
            return source.Do(v => 
                             {
                                if (startNew(v))
                                    starts.OnNext(Unit.Default);
                             })
                         .Buffer(() => starts)
                         .Where(v => v != null && v.Count > 0)
                         .Subscribe(obs);
        });
}
1 голос
/ 20 января 2012

Хорошо, это тоже хорошие ответы с форумов Rx :

Предложение Джеймса Майлза:

var source = new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 }.ToObservable();

var windows = 
from window in source
  .Buffer(2,1) // create an overlapping buffer with 2 items
  .Publish(xs => xs.Window(() => xs.Where(x => x.Last() == 1))) // close the window if the 2nd item is == 1
from result in window
  .Select(buffer => buffer.First()) // we are only interested in the first item (the 2nd item might be the 1!)
  .ToArray() // aggregate the results of the window
where result.Any() // filter out final (empty) window
select result;

int index = 0;
windows.Subscribe(window =>
{
  Console.WriteLine(string.Format("new window [{0}] ", index++));
  foreach(var x in window)Console.WriteLine(x);
});

Дейв Секстон предложил использовать класс Parser из Расширения для реактивных расширений (Rxx) , что представляется более семантическим подходом:

using Rxx.Parsers.Reactive.Linq;

public sealed class SplitLab : BaseConsoleLab
{
    protected override void Main()
    {

        var xs = Observable.Generate(
            new[] { 1, 1, 0, 0, 0, 1, 0, 0, 1, 0, 1 }.GetEnumerator(),
            e => e.MoveNext(),
            e => e,
            e => (int) e.Current,
            e => TimeSpan.FromSeconds(.5));

        var query = xs.Parse(parser =>
            from next in parser
            let one = next.Where(value => value == 1)
            let other = next.Not(one)
            let window = from start in one
                        from remainder in other.NoneOrMore()
                        select remainder.StartWith(start)
            let windowAsString = window.Join()
            select windowAsString);

        using (query.Subscribe(TraceLine))
        {
            WaitForKey();
        }
    }
}

Так много дорог в Риме.

...