У меня есть наблюдаемая коллекция, которую я хочу обрабатывать параллельно, затем наблюдать обработанные значения во время фильтрации и, наконец, подписать обработчик, который получает отфильтрованные значения.
Мой пример синтаксически корректен и прекрасно компилируется, и когда я запускаю код, выполняется оператор Where
, выполняющий фильтрацию.Но в подписку не поступает никаких данных.Если я удаляю AsParallel
так, чтобы обработка выполнялась поверх обычного IEnumerable
, данные поступали и все работало как ожидалось.
Вот мой пример, выполняющий некоторую обработку строк:
// Generate some data every second
var strings = Observable.Generate(() =>
new TimeInterval<Notification<string>>(
new Notification<string>
.OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));
// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
select "Parallel " + value;
// Filter and observe
var data = String.Empty;
parallelStrings
.Where(value => !String.IsNullOrEmpty(value))
.ToObservable()
.Subscribe(value => data = value);
Следующая странная вещь заключается в том, что если я использую оператор TakeWhile
, который, на мой взгляд, концептуально похож на Where, наблюдение за ParallelQuery работает, как и ожидалось:
// Filter and observe
var data = String.Empty;
parallelStrings
.TakeWhile(cs => !String.IsNullOrEmpty(cs))
.ToObservable()
.Subscribe(value => data = value);
Добавление некоторого кода регистрации вподписка показывает, что данные принимаются до ToObservable
преобразования, но не после:
1. var data = String.Empty;
2. parallelStrings
3. .Where(value => !String.IsNullOrEmpty(value))
4. .Select(value => value)
5. .ToObservable()
6. .Select(value => value)
7. .Subscribe(value => data = value);
Точка останова в лямбда-строке в 4-й строке достигнута, а точка останова в лямбда-строке в 6-й строке никогда не достигнут.
Почему TakeWhile
отправляет данные абоненту, а Where
- нет?
Если это важно, я разрабатываю свой код в Visual Studio 2010 RC с проектом, ориентированным на клиентский профиль .Net 4.0 Framework.
Обновление : на основе @ Sergeys answer Я переделал расположение фильтра Where
.Следующий код работает должным образом:
var processedStrings = from value in strings
let processedValue = "Parallel " + value
where !String.IsNullOrEmpty(processedValue)
select processedValue;
var data = String.Empty;
processedStrings
.ToEnumerable()
.AsParallel()
.ToObservable()
.Subscribe(value => data = value );
Все еще немного неловко, когда нужно сначала преобразовать исходную наблюдаемую processedStrings
в перечисляемую для ее распараллеливания, а затем преобразовать обратно в наблюдаемуючтобы подписаться на конечный результат.