Переписывание foreach с использованием IObservable и Reactive Framework - PullRequest
1 голос
/ 04 февраля 2010

Я в VS2008 с Entity Framework. Я обращаюсь к объектам из базы данных, используя esql для функциональности WHERE IN. Я передаю тонну идентификаторов в оператор выбора, поэтому я делю его на наборы по 800. Затем я объединяю результаты из каждого фрагмента. Моя цель - получать результаты для каждого куска параллельно, а не ждать синхронно. Я установил Reactive Framework и уверен, что мне нужно использовать ForkJoin. Однако я не могу понять, как преобразовать эту функцию, чтобы использовать ее. Вот мой существующий код:

    public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
    {
        var chunkedIds = idList.Split(CHUNK_SIZE);
        string entitySetName = typeof(TElement).Name + "Set";
        var retList = new List<TElement>();
        foreach (var idChunk in chunkedIds)
        {
            string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
            ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
            query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
            retList.AddRange(query);
        }
        return retList;
    }

Спасибо! * * 1004

РЕДАКТИРОВАТЬ >>> Я изменил код, чтобы использовать Бедного человека, как показано ниже:

    public static IList<TElement> SelectWhereIn<TElement, TValue>(this ObjectContext context, string fieldName, IList<TValue> idList)
    {
        var chunkedIds = idList.Split(CHUNK_SIZE);
        string entitySetName = typeof(TElement).Name + "Set";
        var chunkLists = new List<IEnumerable<TElement>>();
        Parallel.ForEach(chunkedIds, idChunk =>
        {
            string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
            ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
            query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
            chunkLists.Add(query.ToList());
        });
        var retList = new List<TElement>();
        foreach (var chunkList in chunkLists)
        {
            retList.AddRange(chunkList);
        }
        return retList;
    }

С первого раза все заработало. Но во второй раз, когда я его запустил, я получил эту ошибку:

Соединение не было закрыто. Текущее состояние соединения подключается. Описание: во время выполнения текущего веб-запроса произошло необработанное исключение. Пожалуйста, просмотрите трассировку стека для получения дополнительной информации об ошибке и о том, где она возникла в коде.

Сведения об исключении: System.InvalidOperationException: Соединение не было закрыто. Текущее состояние соединения - это соединение.

Ошибка источника:

Строка 49: foreach (var iAsyncResult в resultList) Строка 50: { Строка 51: del.EndInvoke (iAsyncResult); Строка 52: iAsyncResult.AsyncWaitHandle.Close (); Строка 53:}

Это интересно, потому что Эм / автор книги (автор библиотеки) отредактировал свой оригинальный пост, рассказывая о том, как он добавил эти строки кода для большей безопасности. Я правильно это использую? Или его v1 все-таки безопаснее?

1 Ответ

1 голос
/ 04 февраля 2010

VS2010 имеет это с PLINQ. Использование расширений AsParallel().WithDegreeOfParallelism(nbProcessors) будет делать то, что вам нужно.

В VS2008 я использовал Parallel.ForEach для Бедного Человека, итератор от Emre Aydinceren В прошлом, когда я пытался обойти узкое место в производительности, попытался сделать это.

РЕДАКТИРОВАТЬ: В ответ на ошибку, которую вы добавили, это может быть случайный выстрел в темноте, но отдельные контексты для каждого потока? Вот так:

Parallel.ForEach(chunkedIds, idChunk =>
    {
        ObjectContext context = new MyContext(connStr);//depending what's your config
                                                       // like, with or w/o conn string

        string delimChunk = string.Join(",", idChunk.Select(x => x.ToString()).ToArray());
        ObjectQuery<TElement> query = context.CreateQuery<TElement>("SELECT VALUE x FROM " + entitySetName + " AS x");
        query = query.Where("it." + fieldName + " IN {" + delimChunk + "}");
        chunkLists.Add(query.ToList());
    });

Возможно, вам придется настроить некоторые вещи (например, взять строку подключения из расширенного контекста для создания новых контекстов).

...