Есть ли способ узнать, когда обрабатывается OnNext? - PullRequest
1 голос
/ 15 сентября 2010

У меня есть конвейерная установка, все с использованием реактивных расширений, начиная с потока значений (может прибыть в любое время, когда они захотят), а затем, подписавшись на него, есть много разных «модулей», выводящих поток вычисляемых значений, и этот потоктакже подписан другим уровнем модулей, которые также вычисляют другие значения.

Теперь в конце всего этого есть один класс, который прослушивает значения всех этих модулей и выводит сам какой-то другой вид значения.

Теперь в обычной жизни первоначальный поток значений поступает всякий раз, один за другим.

Но я также хочу иметь возможность «воспроизвести» эти значения и получить все выходные данные, но на этот раз »как можно быстрее ".

Но мне все еще нужны эти значения, чтобы иметь смысл, т. е. чтобы конечный результат мог быть" восстановлен "до исходного значения, которое его сгенерировало.

Не уверен, еслиВ этом есть смысл.

Есть несколько вещей, которые я рассматриваю, но ни одна из которых не кажется действительно чистой.

Нужно иметь "acОтмеченное время ".Т.е. есть какая-то временная деформация и все переигрывание, скажем, со скоростью 100x или чем-то еще.Однако у этого есть некоторые существенные недостатки, так как при скорости 100x время обработки значения становится гораздо более значительным.

Другой вопрос, который меня интересует (и приводит к моему вопросу), - это возможность«пошагово» все это, т. е. чтобы иметь возможность узнать, когда OnNext полностью обработан всеми его подписчиками, чтобы я мог просто отправить одно значение, подождать, пока все его обработают, получить соответствующее выходное значение,а затем отправьте следующий.

Преимущество будет в том, что его можно будет воспроизвести "настолько быстро, насколько позволяет мой компьютер".

Мне было интересно, что-то не хватало,где-нибудь, или способ сделать то, что я упустил из виду, которое может легко сделать то, что я хочу.

В случае неудачи единственная идея, которую я мог придумать, состояла в том, чтобы иметь прокси / фасады для всех моих классов, чтобы перехватыватькаждые входящие и исходящие значения и уведомить об этом, чтобы мой «проигрыватель» смог отследить все это.


У меня естьВозможно, я сформулировал мой вопрос в плохой форме.Позвольте мне перефразировать.

Моя проблема не столько в «воспроизведении» значений, я могу сделать это просто отлично, даже если просто загрузить их с диска и поместить их в IEnumerable, тогда они будут поданыкак можно быстрее.

Моя проблема в том, что, поскольку все конвейеры являются многопоточными по своей природе, если я отправлю их просто так, как можно быстрее, у меня не будет возможности отследить, какое входное значениесгенерировал, какое выходное значение (в отличие от вашего примера, они, вероятно, будут связаны друг с другом).

Я думал о TimeStamped, но это бесполезно для меня по нескольким причинам:

  1. Разрешение TimeStamped в значительной степени соответствует разрешению DateTime.Now, то есть 10 мс, что недостаточно точно, и

  2. снова, при подходе, основанном на времени, если мой конвейердля обработки, скажем, 5 мсек, затем внезапно эти 5 мсек приобретают слишком много значения (например, через 5 мсек многие входные значения уже могли бы войти, что делает его невозможным).Не знаю, какой из них сгенерировал, какое выходное значение).

Я знаю, что все это звучит очень "расплывчато", и определенно не хватает небольшого образца, но я не могу поместить весь свой код втам, поскольку он слишком длинный и сложный, в какой-то момент я попытаюсь привести в действие небольшой пример.

Все это для "имитационной" среды, где я могу воспроизвести вчерашние значения и проследить, что произошло, икогда, но я мог бы воспроизвести их быстрее.

Большое спасибо за ваш ответ, хотя.

Ответы [ 2 ]

1 голос
/ 06 февраля 2012

Если вы хотите идеально воспроизвести вашу многопоточную среду с воспроизведением, это будет либо просто, либо невозможно.(Глупое утверждение я знаю).1) Если ваш рабочий процесс вводит параллелизм, то есть у вас есть несколько планировщиков или побочные эффекты многопоточности (плохие) между исходной последовательностью, производящей значение, и конечным потреблением значения;тогда тебе не повезет.После того, как вы добавите планирование и потоки, вы не получите точность, которую вы ищете (5 мс).2) Если, однако, вы применяете планирование только конечным потребителем (т. Е. Используете SubScribeOn & ObserveOn только один раз во всей цепочке), и вы также избегаете любых конструкций потоков / планирования или побочных эффектов в своем конвейере;тогда вас ждут хорошие новости.

Следуйте рекомендациям, которые вы вводите только для параллельного / календарного планирования для конечного подписчика, и вы можете легко использовать VirtualScheduler / TestScheduler для контроля времени.Это должно работать как мечта, и вы можете ускорить время, но при этом можете наслаждаться роскошью последовательных гарантий Rx.

Эти ссылки на мой блог тоже могут помочь:

http://leecampbell.blogspot.com/2010/06/rx-part-6-scheduling-and-threading.html

http://leecampbell.blogspot.com/2010/10/rx-part-8-testing-rx.html

0 голосов
/ 15 сентября 2010

Если я понимаю ваши требования здесь, вы хотите иметь возможность воспроизвести ваш наблюдаемый поток значений без первоначальной задержки между каждым отправленным значением.Если это правильно, то разве это не тот случай, когда можно использовать метод расширения Replay?

IConnectableObservable<TSource>
    Replay<TSource>(this IObservable<TSource> source);

Вот тестовый код, который я собрал, чтобы проверить поведение:

var sw = new System.Diagnostics.Stopwatch();
sw.Start();

Action<string, int> writeLine = (t, n) =>
    Console.WriteLine("{0}: {1} @ {2} seconds",
        t, n,
        sw.Elapsed.TotalSeconds.ToString("0.000"));

var source = Observable
    .GenerateWithTime<int, int>
    (0, n => n < 5, n => n + 1, n => n,
    n => TimeSpan.FromSeconds(1.0));

var replay = source.Replay();

using (var replayConnect = replay.Connect())
{
    Action sourceFinally = () =>
    {
        replay.Run(n => writeLine("Replay", n));
    };
    source.Finally(sourceFinally).Run(n => writeLine("Source", n));
}

Результат выполнения этого кода:

Source: 0 @ 1.094 seconds
Source: 1 @ 2.115 seconds
Source: 2 @ 3.129 seconds
Source: 3 @ 4.143 seconds
Source: 4 @ 5.158 seconds
Replay: 0 @ 5.199 seconds
Replay: 1 @ 5.201 seconds
Replay: 2 @ 5.201 seconds
Replay: 3 @ 5.201 seconds
Replay: 4 @ 5.201 seconds

Если вам нужно знать, когда были выдвинуты исходные значения, вы можете использовать метод расширения Timestamp, чтобы изменить наблюдаемое с IObservable<T> на IObservable<Timestamped<T>>.

IObservable<Timestamped<TSource>>
    Timestamp<TSource>(this IObservable<TSource> source);

Не могу сказать, курил ли ты слишком много, но поможет ли это "детоксикации"?; -)

...