Как использовать IObservable <int>для сегментирования IObservable <T>в IObservable <T []> различной длины - PullRequest
1 голос
/ 03 июля 2019

У меня есть «значения» IObservable<T>, которые возвращают T элементы, которые должны быть последовательно объединены в массивы переменной длины, и у меня есть «control» IObservable<int>, который сообщает мне, как долго должен быть следующий массив быть. Удаление элемента, его повторение или вывод результатов из строя приведут к тому, что результаты не будут иметь смысла.

Это для робототехнического проекта с последовательным подключением, который я переписываю в Rx.NET.

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.SelectMany(length => values.Take(length).ToArray());

Я бы хотел увидеть что-то вроде этого:

values  ----A--B--C--D--E--F--G--H-->
control --1-----4---------------2--->
result  ---[A]---------[BCDE]--[FG]->

Но моя попытка до сих пор приводит к

-[A]-[AB]-[ABCD]->

Ответы [ 3 ]

1 голос
/ 07 июля 2019

Хорошо, вот код, который соответствует всем моим потребностям.Progman, вы сыграли важную роль в этой работе с вашим советом.Вот он, завернутый в аккуратный Observable.Create и превращенный в метод расширения на IObservable<T>, с одноразовым, который избавляется от подписки на заархивированную последовательность.

    public static IObservable<T[]> Chop<T>(this IObservable<T> values, IObservable<int> control) =>
        Observable.Create<T[]>(observer => 
        {
            List<T> buffer = new List<T>();
            return values.Zip(control.SelectMany(length => Enumerable.Repeat(length, length)), 
                              (value, length) => (value, length))
                         .Subscribe(next => 
                         {
                             buffer.Add(next.value);
                             if (buffer.Count == next.length)
                             {
                                 observer.OnNext(buffer.ToArray());
                                 buffer.Clear();
                             }
                         });
        });

Пример вывода:

values  ----A--B--C--D--E--F--G--H--I--J--K--L--M--N--O--P-->
control --1-4-2-0-3-3--------------------------------------->
result  ---[A]---------[BCDE]-[FG]----[HIJ]----[KLM]-------->
0 голосов
/ 04 июля 2019

Я не проверял это, но я полагаю, что вы можете объединить Zip() и Scan() для получения запрошенного вами результата.

IObservable<char> values = new [] {'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H' }.ToObservable();
IObservable<int> control = new [] { 1, 4, 2 }.ToObservable();
IObservable<char[]> result = control.Zip(
    control.Scan(values, (rest, length) => rest.Skip(length)),
    (length, vals) => vals.Take(length).ToArray()
);
0 голосов
/ 04 июля 2019

Вы можете создать несколько вспомогательных предметов для подготовки / создания новых наблюдаемых для создания новой наблюдаемой, которую вы хотите. Вы можете построить предметы для этих видов наблюдаемых:

  1. Создать новую наблюдаемую, которая повторяет уникальное значение, равное числу, считанному с control. От (1, 4, 2) вы получите (guid_1, guid_2, guid_2, guid_2, guid_2, guid_3, guid_3). Назовите это наблюдаемым repeatSize.
  2. Используйте оператор Zip(), чтобы объединить одно значение из repeatSize и values. Вы получите наблюдаемое со значениями: ((A,guid_1), (B,guid_2), (C,guid_2), (D,guid_2), (E,guid_2), (F,guid_3), (G,guid_3)). Назовите это наблюдаемым zippedValues.
  3. Подпишитесь на zippedValues и добавьте / добавьте оригинальное значение в список. Также сохраните предыдущее значение из наблюдаемой repeatSize. Сравните это с текущим значением repeatSize. Когда он был изменен (например, с guid_2 на guid_3), вы знаете, что достигли конца / начала, так что вы можете отправить заполненный список новой наблюдаемой. После этого вы снова сбрасываете список и снова начинаете его заполнять.

Возможно, вам потребуется создать 2-3 Subject<T> объектов, подписаться на них и использовать несколько вызовов OnNext(), чтобы заполнить их из подписки других наблюдаемых.

...