Реактивные расширения: изменение существующего потока, который обрабатывается медленно - PullRequest
0 голосов
/ 20 сентября 2018

Я изучаю Rx и пытаюсь перевести следующую проблему в конвейер Rx.Похоже, что для этого должно быть простое решение Rx, но я не могу его найти.Вот простой код C # для демонстрации проблемы:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Item = System.Collections.Generic.KeyValuePair<int, string>;

namespace Sample
{
    class Test
    {
        readonly object _sync = new object();

        readonly List<Item> _workList = new List<Item>();

        public void Update(IEnumerable<Item> items)
        {
            lock(_sync)
            {
                foreach (var item in items)
                {
                    bool found = false;
                    for (int i = 0; i < _workList.Count; ++i)
                    {
                        if (_workList[i].Key == item.Key)
                        {
                            _workList[i] = item;
                            found = true;
                            break;
                        }
                    }

                    if (!found)
                    {
                        _workList.Add(item);
                    }
                }
            }
        }

        public void Run()
        {
            void ThreadMethod(object _)
            {
                while (true)
                {
                    Item? item = null;

                    lock (_sync)
                    {
                        if (_workList.Any())
                        {
                            item = _workList[0];
                            _workList.RemoveAt(0);
                        }
                    }

                    if (item.HasValue)
                    {
                        var str = $"{item.Value.Key} : {item.Value.Value}";

                        Console.WriteLine($"Start {str}");
                        Thread.Sleep(5000); // simluate work
                        Console.WriteLine($"End {str}");
                    }
                }
            }

            var thread = new Thread(ThreadMethod);
            thread.Start();
        }
    }
}

Событие 'update' состоит из списка пар ключ / значение.Обновление объединено с существующим списком со следующими правилами. Не гарантируется, что каждый известный ключ будет появляться в каждом обновлении

  • Если ключ найден, значение заменяется на текущую позицию в списке.Предыдущее значение будет отброшено и не будет обработано.
  • Если ключ не найден, элемент добавляется в конец списка

Отдельный поток обрабатывает список один предмет за раз .Эта обработка занимает некоторое время (имитируется Thread.sleep).Элементы удаляются из начала списка, когда они обрабатываются.

Как вы можете видеть, во время обработки отдельного элемента элементы в отставании могут видоизменяться на месте.Дело в том, что для каждого ключа будет обрабатываться только самое последнее полученное значение, но порядок ключей в резерве не может быть изменен (кроме случаев, когда ключ обрабатывается, он удаляется из списка. Если ключ повторно вводится в список, ондобавлено в конец).

Моя последняя попытка с Rx состояла в том, чтобы передать обновление в функцию сканирования, которая превратила ранее неизвестные ключи в темы, а затем ввести новые значения для каждой клавиши в соответствующую тему перед объединением всех последнихзначения, но это не совсем работает.

Пожалуйста, воздержитесь от обсуждения решений не-Rx.Простой код, приведенный выше, сделает эту работу, но я хотел бы узнать, есть ли решение Rx.

Я работаю в C # (System.Reactive), но я с радостью приму решения на других диалектахRx.

Ответы [ 2 ]

0 голосов
/ 21 сентября 2018

Это будет работать, хотя я не самый большой поклонник этого.

Я рассматривал это как ситуацию производителя / потребителя: один поток создает работу, а другой - выполняет.Тема producer представляет поток, который добавляет работу.Все остальное представляет потребительскую сторону вещей.Если бы вы собрались class, то producer перешло бы в один класс, а все остальное в другой.

completedKeys содержит ключи, которые выполнены, поэтому состояние для этого ключа будет выброшено.: новый элемент с этим ключом перейдет в конец строки.readyGate представляет, когда потребитель вновь доступен для работы над следующей вещью.Объединение этого с последним из того, над чем работать, является сложной частью.WithLatestFrom прекрасно работает, пока вы не получите пустой список..Where().FirstAsync() прекрасно выполняет ожидающую часть.

Ключом ко всему этому является GroupByUntil: это группирует вещи, и они естественным образом попадают в порядок, в котором ключи были добавлены первыми, то есть то, что вам нужно.Предложение Until означает, что мы можем закрыть наблюдаемое, что приведет к тому, что новый элемент со старым ключом окажется в конце строки.DynamicCombinedLatest превращает все эти наблюдаемые в список, который фактически является вашим состоянием.

В любом случае, вы идете:

var producer = new Subject<Item>();
var readyGate = new Subject<Unit>();
var completedKeys = new Subject<int>();

var Process = new Action<Item>(kvp =>
{
    var str = $"{kvp.Key} : {kvp.Value}";

    Console.WriteLine($"Start {str}");
    Thread.Sleep(500); // simluate work
    Console.WriteLine($"End {str}");
});

var groups = producer
    .GroupByUntil(kvp => kvp.Key, kvp => kvp, go => completedKeys.Where(k => k == go.Key))
    .DynamicCombineLatest();

var q = groups.Publish(_groups => readyGate
        .ObserveOn(NewThreadScheduler.Default)
        .WithLatestFrom(groups, (_, l) => l)
        .SelectMany(l => l.Count == 0
            ? _groups.Where(g => g.Count > 0).FirstAsync()
            : Observable.Return(l)
        )
    )
    .Subscribe(l =>
    {
        var kvp = l[0];
        completedKeys.OnNext(kvp.Key);
        Process(kvp);
        readyGate.OnNext(Unit.Default);
    });


//Runner code:
producer.OnNext(new Item(1, "1-a"));
producer.OnNext(new Item(1, "1-b"));

producer.OnNext(new Item(2, "2-a"));
producer.OnNext(new Item(2, "2-b"));

readyGate.OnNext(Unit.Default);

await Task.Delay(TimeSpan.FromMilliseconds(100)); //to test if 1 gets done again and goes to the back of the line.
producer.OnNext(new Item(1, "1-c"));

И DynamicCombinedLatest это (использует пакет nugetSystem.Collections.Immutable):

public static IObservable<List<T>> DynamicCombineLatest<T>(this IObservable<IObservable<T>> source)
{
    return source
        .SelectMany((o, i) => o.Materialize().Select(notification => (observableIndex: i, notification: notification)))
        .Scan((exception: (Exception)null, dict: ImmutableDictionary<int, T>.Empty), (state, t) => t.notification.Kind == NotificationKind.OnNext
            ? ((Exception)null, state.dict.SetItem(t.observableIndex, t.notification.Value))
            : t.notification.Kind == NotificationKind.OnCompleted
                ? ((Exception)null, state.dict.Remove(t.observableIndex))
                : (t.notification.Exception, state.dict)
        )
        .Select(t => t.exception == null
            ? Notification.CreateOnNext(t.dict)
            : Notification.CreateOnError<ImmutableDictionary<int, T>>(t.exception)
        )
        .Dematerialize()
        .Select(dict => dict.OrderBy(kvp => kvp.Key).Select(kvp => kvp.Value).ToList());
}
0 голосов
/ 20 сентября 2018

Есть два механизма, которые вам понадобятся для достижения вашей цели.Первая - это карта, которая дает вам последнее значение испущенного элемента.Второй оператор flatMap().

Map<String, String> currentSourceValue = new HashMap<>();

Я использую String в качестве типа данных и методы keyOf() и valOf().

Этот метод обновит картус последним значением.Если текущее значение уже было, замените его и верните наблюдаемую empty().

synchronized Observable<String> setLatestValue( String s ) {
    String r = currentSourceValue.put( keyOf( s ), valOf( s ) );
    return r == null ? Observable.just( s ) : Observable.empty();
}

Этот метод извлечет значение из карты, если оно может быть передано.

synchronized Observable<String> getLatestValue( String s ) {
    String r = currentSourceValue.remove( keyOf( s ) );
    return r == null ? Observable.empty() : Observable.just( r );
}

Это позволит выдавать последнее значение

source
 .flatMap( s -> setLatestValue( s ) )
 .observeOn( processingScheduler )
 .flatMap( s -> getLatestValue( s ), 1 )
 .subscribe( s -> process( s ) );

Первый оператор flatMap() обновляет последнее значение входящего потока.Если в очереди уже есть элемент для этого ключа, возвращается наблюдаемое empty(), поэтому в цепочке ниже не используется пространство.

Второй оператор flatMap() работает в потоке обработки.Второй параметр flatMap() говорит, что элементы должны обрабатываться по одному, без параллелизма.Он выдаст значение, если оно присутствует на карте, или не даст значения, если его нет, и очистит запись карты.Теоретически, второй flatMap() может просто выдавать значение, но есть некоторая недетерминированность, когда цепочка наблюдателей переходит от одного потока к другому в восходящем направлении.

Ключевое слово synchronized указывает, что действия на картеявляются атомарными и предотвращают удаление значения из карты вниз по течению, так же, как оно было добавлено к карте вверх по течению.

Это решение работает аналогично оператору groupBy(), но онообрабатывает ситуацию, когда вы хотите обработать только последнее значение для данного ключа.

...