Почему ParallelQuery <T>. Где не работает при конвертации в Observable? - PullRequest
3 голосов
/ 18 февраля 2010

У меня есть наблюдаемая коллекция, которую я хочу обрабатывать параллельно, затем наблюдать обработанные значения во время фильтрации и, наконец, подписать обработчик, который получает отфильтрованные значения.

Мой пример синтаксически корректен и прекрасно компилируется, и когда я запускаю код, выполняется оператор Where, выполняющий фильтрацию.Но в подписку не поступает никаких данных.Если я удаляю AsParallel так, чтобы обработка выполнялась поверх обычного IEnumerable, данные поступали и все работало как ожидалось.

Вот мой пример, выполняющий некоторую обработку строк:

// Generate some data every second
var strings = Observable.Generate(() =>
    new TimeInterval<Notification<string>>(
        new Notification<string>
            .OnNext(DateTime.Now.ToString()), TimeSpan.FromSeconds(1)));

// Process the data in parallel
var parallelStrings = from value in strings.ToEnumerable().AsParallel()
                      select "Parallel " + value;

// Filter and observe
var data = String.Empty;
parallelStrings
    .Where(value => !String.IsNullOrEmpty(value))
    .ToObservable()
    .Subscribe(value => data = value);

Следующая странная вещь заключается в том, что если я использую оператор TakeWhile, который, на мой взгляд, концептуально похож на Where, наблюдение за ParallelQuery работает, как и ожидалось:

// Filter and observe
var data = String.Empty;
parallelStrings
    .TakeWhile(cs => !String.IsNullOrEmpty(cs))
    .ToObservable()
    .Subscribe(value => data = value);

Добавление некоторого кода регистрации вподписка показывает, что данные принимаются до ToObservable преобразования, но не после:

1.    var data = String.Empty;
2.    parallelStrings
3.        .Where(value => !String.IsNullOrEmpty(value))
4.        .Select(value => value)
5.        .ToObservable()
6.        .Select(value => value)
7.        .Subscribe(value => data = value);

Точка останова в лямбда-строке в 4-й строке достигнута, а точка останова в лямбда-строке в 6-й строке никогда не достигнут.

Почему TakeWhile отправляет данные абоненту, а Where - нет?

Если это важно, я разрабатываю свой код в Visual Studio 2010 RC с проектом, ориентированным на клиентский профиль .Net 4.0 Framework.

Обновление : на основе @ Sergeys answer Я переделал расположение фильтра Where.Следующий код работает должным образом:

var processedStrings = from value in strings
                       let processedValue = "Parallel " + value
                       where !String.IsNullOrEmpty(processedValue)
                       select processedValue;

var data = String.Empty;
processedStrings
    .ToEnumerable()
    .AsParallel()
    .ToObservable()
    .Subscribe(value => data = value );

Все еще немного неловко, когда нужно сначала преобразовать исходную наблюдаемую processedStrings в перечисляемую для ее распараллеливания, а затем преобразовать обратно в наблюдаемуючтобы подписаться на конечный результат.

Ответы [ 2 ]

2 голосов
/ 18 февраля 2010

Из C # 4.0 в двух словах :


В настоящее время существуют некоторые практические ограничения на то, что PLINQ может распараллеливать. Эти ограничения могут ослабнуть с последующими пакетами обновлений и версиями Framework. Следующие операторы запроса не позволяют распараллелить запрос, если только исходные элементы находятся в исходной позиции индексации:

  • Take, TakeWhile, Skip и SkipWhile
  • Индексированные версии Select, SelectMany и ElementAt

Большинство операторов запросов изменяют позицию индексации элементов (включая те, которые удалить элементы, такие как Где). Это означает, что если вы хотите использовать предыдущее операторы, они обычно должны быть в начале запроса


Таким образом, фактически использование TakeWhile предотвращает распараллеливание .AsParallel (). Трудно сказать , почему Где убивает подписку, но если поставить ее перед AsParallel , это может решить проблему.

2 голосов
/ 18 февраля 2010

TakeWhile концептуально не эквивалентно Where, потому что это зависит от порядка.Я подозреваю, что запрос на самом деле выполняется последовательно (см. этот пост ).Попробуйте позвонить .WithExecutionMode(ParallelExecutionMode.ForceParallelism) в вашем TakeWhile примере, и я подозреваю, что вы увидите тот же результат.

Хотя я не знаю, почему он не работает в параллельном случае ... могу ли я предположить, чтоВы вводите некоторые записи, чтобы увидеть, как далеко данные доходят?Вы можете выполнить полезную регистрацию с помощью Select, который возвращает оригинальный элемент после регистрации, например.

...