Реактивная Rx zip очередь в .Net - PullRequest
0 голосов
/ 17 мая 2018

Я довольно новичок в концепции реактивного программирования.Я использую Bonsai , который предоставляет некоторые, но не все команды .Net rx через c #.

Я пытаюсь получить поведение, подобное этой мраморной диаграмме:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e

По сути, вход 2 генерирует волны событий, которые должны храниться в очереди.Вход 1 действует как триггер для вывода отдельных элементов из этой очереди.

Когда очередь пуста, последний элемент очереди должен быть выпущен.Я пробовал различные комбинации zip и combLatest, но я не могу получить желаемое поведение.

Я также пробовал реализацию WithLatestFrom на основе этого поста , но я понимаю, что это тожене собирается производить желаемое поведение.

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
            IObservable<TSource> source,
            IObservable<TOther> other)
        {


            // return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
            return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
        }

Существуют ли какие-либо операторы или комбинации операторов, которые будут производить это поведение?Я могу сделать реализацию для Бонсай, как только пойму, какие операторы использовать.

ОБНОВЛЕНИЕ 1: 2018/05/18

Основываясь на посте Стража, я написал новыйкласс DiscriminatedUnion внутри пространства имен Bonsai.Мне не удалось указать соответствующие типы, хотя.Компилятор утверждает, что «аргументы типа для Merge не могут быть выведены» (в .Merge(input1.Select...).Где я могу добавить правильную спецификацию типа?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;    
namespace Bonsai.Reactive
{
    [Combinator]
   // [XmlType(Namespace = Constants.XmlNamespace)]
    [Description("Implementation of Discriminated Union")]
    public class DiscriminatedUnion
    {
        public IObservable<int?> Process<TInput1, TInput2>(
           IObservable<TInput1> input1,
            IObservable<TInput2> input2)
        {
            var merged =
                        input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
                        .Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
                        .Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
                        {
                            int? next = state.Item1;
                            if (val.Item1 == 1)
                            {
                                if (state.Item2.Count > 0)
                                {
                                    next = state.Item2.Dequeue();
                                }
                            }
                            else
                            {
                                state.Item2.Enqueue(val.Item2);
                            }
                            return Tuple.Create(next, state.Item2, val.Item1);
                        })
                        .Where(x => (x.Item1 != null && x.Item3 == 1))
                        .Select(x => x.Item1);
            return merged;
        }
    }
}

Ответы [ 3 ]

0 голосов
/ 18 мая 2018

Вот тестируемое представление вашей проблемы (или мраморная диаграмма), используя пакет NuGet Microsoft.Reactive.Testing:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

, который использует этот метод расширения:

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

Проблема заключается вв основном проблема конечного автомата, которая включает в себя две наблюдаемые разных типов.Лучший способ решить эту проблему - использовать тип Discrimination Union , которого нет в C #, поэтому мы его создадим.Ответ @ Sentinel сделал это с помощью Tuple, и это также может сработать:

public class DUnion<T1, T2>
{
    public DUnion(T1 t1) 
    { 
        Type1Item = t1;
        Type2Item = default(T2);
        IsType1 = true;
    }

    public DUnion(T2 t2) 
    { 
        Type2Item = t2;
        Type1Item = default(T1);
        IsType1 = false;
    }

    public bool IsType1 { get; }
    public bool IsType2 => !IsType1;

    public T1 Type1Item { get; }
    public T2 Type2Item { get; }
}

Затем мы можем взять два наших потока разной типизации, Select и Merge, в один распознаваемый поток объединения,где мы можем управлять государством с Scan.Ваша логика состояния немного сложна, но выполнима:

  • , если приходит число и в очереди нет элементов, ничего не делать
  • , если число приходит и в очереди есть элементы, выпустить первый элемент в очереди.
    • Если имеется более одного элемента, удалите недавнее удаление из очереди.
    • Если в очереди есть только один элемент, не удаляйте его и переходите в «поддельное пустое» состояние.
  • если приходит строка, вставьте ее вочередь.
    • Если очередь является «поддельной пустой», извлеките последний элемент и выйдите из состояния «поддельной пустой».

Вот результат, наблюдаемый (используетПакет NuGet System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
    .Merge(input2.Select(s => new DUnion<int, string>(s)))
    .Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
        ? state.queue.IsEmpty   
            ? (state.queue, null, false, false)     //Is integer, but empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
                ? (state.queue,           state.queue.Peek(), true,  true)
                : (state.queue.Dequeue(), state.queue.Peek(), false, true)
        : state.isFakeEmptyState //Is new string, just add to queue, don't emit
            ? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false) 
            : (state.queue.Enqueue(dItem.Type2Item),   (string)null, false, false) 
    )
    .Where(t => t.emit)
    .Select(t => t.item);

Это можно проверить следующим образом:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

Обновление : Я немного подумал об этоми я думаю, что имеет смысл бросить некоторых операторов вокруг функциональности Дискриминационного союза.Таким образом, вам не нужно явно иметь дело с типом:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2)
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
}

С этими методами расширения решение меняется на это, что, я думаю, читается лучше:

var result = input1
    .Union(input2)
    .ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), 
        (state, _) => state.queue.IsEmpty
            ? (state.queue, null, false, false)     //empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : (state.queue.Dequeue(), state.queue.Peek(), false, true),
        (state, s) => state.isFakeEmptyState 
            ? (state.queue.Dequeue().Enqueue(s), null, false, false)
            : (state.queue.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.emit)
    .Select(t => t.item); 

Еслиу вас возникли проблемы с именованным синтаксисом кортежей, тогда вы можете использовать старые кортежи:

var result = input1
    .Union(input2)
    .ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
        (state, _) => state.Item1.IsEmpty
            ? Tuple.Create(state.Item1, (string)null, false, false)     //empty queue, so don't emit item
            : state.Item1.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
        (state, s) => state.Item3
            ? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
            : Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.Item4)
    .Select(t => t.Item2);
0 голосов
/ 18 мая 2018

Я люблю эти головоломки Rx.Не могу поверить, что за это платят.Поэтому я придумал немного другой подход.Я думаю, что здесь есть некоторые недостатки с условиями гонки, но мне было бы любопытно, что вы думаете и как их можно устранить.

Основная идея состоит в том, чтобы думать об очереди как о рекурсивном буфере до тех пор, пока не закончится источник1,где буфер воспроизводится в очередь без первого элемента.

ОБНОВЛЕНИЕ

Основываясь на наблюдении Шломо, что необходим publish (). Refcount (), я обновил код и превратил решение в расширение "RegulatedQueue".Пожалуйста, смотрите код ниже.Input2 является источником для регулирования через очередь, Input1 является регулирующим сигналом.

public static class RxHelpers
{
    public static IObservable<TInput2> RegulatedQueue<TInput1, TInput2>(this IObservable<TInput2> input2,
       IObservable<TInput1> input1
        )
    {
        return Observable.Using(() => new Subject<TInput2>(),
        queue =>
        {
            input2.Subscribe(queue);
            return queue
                .Buffer(() => input1)
                .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
                .Where(l => l.Count > 0)
                .Select(l => l.First()).
                Publish().
                RefCount();
        });
    }
}


class Program
{


    static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(2000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        var merged = source2.RegulatedQueue(source1);

        merged.Subscribe(x => Console.WriteLine("Merged1 " + x));
        merged.Subscribe(x => Console.WriteLine("Merged2 " + x));






        Console.ReadKey();

    }
}

OBSOLETE

  static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        //THIS BIT
         Subject<int> queue = new Subject<int>();
        source2.Subscribe(queue);
        var merged=queue
            .Buffer(() => source1)
            .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
            .Where(l=>l.Count > 0)
            .Select(l => l.First());





            merged.Subscribe(x => Console.WriteLine("Merged "+x));







        Console.ReadKey();

    }

Тест-код:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

Subject<string> queue = new Subject<string>();
input2.Subscribe(queue);
var result = queue
    .Buffer(() => input1)
    .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
    .Where(l => l.Count > 0)
    .Select(l => l[0]);

result.Timestamp(scheduler)
    .Select(t => $"{t.Timestamp.Ticks} ticks: {t.Value}")
    .Dump(); //Linqpad

ожидаетсявывод:

//14000000 enqueue a
//15000000 enqueue b
//16000000 enqueue c
20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: c 
//55000000 enqueue d
//56000000 enqueue e
//57000000 enqueue f
60000000 ticks: c //should really be d, but there's no handling for fake-empty ejection
70000000 ticks: d 
80000000 ticks: e 
90000000 ticks: f 
100000000 ticks: f 
110000000 ticks: f 
120000000 ticks: f 
130000000 ticks: f 
140000000 ticks: f 
...

фактический вывод:

20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: b 
60000000 ticks: c 
70000000 ticks: b 
80000000 ticks: c 
90000000 ticks: c 
100000000 ticks: b 
110000000 ticks: c 
120000000 ticks: c 
130000000 ticks: b 
140000000 ticks: c 
150000000 ticks: b 
160000000 ticks: c 
170000000 ticks: b 
180000000 ticks: c 
190000000 ticks: c 
0 голосов
/ 18 мая 2018

Это бы сработало?Вероятно, есть лучший способ сделать эти буферы, поэтому, возможно, стоит пересмотреть это.

        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));



        var merged =
            source2.Select(s2 => Tuple.Create(2, s2))
            .Merge(source1.Select(s1 => Tuple.Create(1, (int)s1)))
            .Scan(Tuple.Create((int?)null, new Queue<int>(),0), (state, val) =>
                 {
                     int? next = state.Item1;
                     if (val.Item1 == 1)
                     {
                         if (state.Item2.Count > 0)
                         {
                             next = state.Item2.Dequeue();
                         }
                     }
                     else
                     {
                         state.Item2.Enqueue(val.Item2);

                     }
                     return Tuple.Create(next, state.Item2,val.Item1);
                 })
            .Where(x=>(x.Item1!=null && x.Item3==1))
            .Select(x => x.Item1);



        merged.Subscribe(x => Console.WriteLine("Merged "+x));

ОБНОВЛЕНИЕ Фиксированный код для OP:

 public class DiscriminatedUnion
{
    public static IObservable<TInput2> Process<TInput1, TInput2>(
       IObservable<TInput1> input1,
        IObservable<TInput2> input2)
    {
        var merged =
                    input2.Select(s2 => Tuple.Create(2, (object)s2))
                    .Merge(input1.Select(s1 => Tuple.Create(1, (object)s1)))
                    .Scan(Tuple.Create(default(TInput2), new Queue<TInput2>(), 0), (state, val) =>
                    {
                        TInput2 next = state.Item1;
                        if (val.Item1 == 1)
                        {
                            if (state.Item2.Count > 0)
                            {
                                next = state.Item2.Dequeue();
                            }
                        }
                        else
                        {
                            state.Item2.Enqueue((TInput2)val.Item2);
                        }
                        return Tuple.Create(next, state.Item2, val.Item1);
                    })
                    .Where(x => (!x.Item1.Equals(default(TInput2)) && x.Item3 == 1))
                    .Select(x => x.Item1);
        return merged;
    }
}
...