Список отправка Fun c в качестве параметра параллельно, и Fun c, выполненный несколько раз, приводит к дублированию некоторых списков, а другие пропускаются - PullRequest
0 голосов
/ 16 января 2020

У нас есть метод Execute, который вызывается параллельно так:

    ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);

И у нас есть

    protected Task<string> Execute(HttpRequestType request, IEnumerable<TItem> ids)
    {
        return SomeFunction(() => CreateRequest(request, ids.ToList()));
    }

CreateRequest (request, ids.ToList ()) возвращает HttpRequestMessage и SomeFunction принимает Func<HttpRequestMessage>.

И

    private async Task<T> SomeFunction(Func<HttpRequestMessage> func)
    {
        var request = func();
        var retryCount = 0;
        T result = null;
        for (; retryCount < MaxRetries; retryCount++)
        {
            try
            {
                result = DoSomethingWithRequest(request);
                if(result != null) break;
            }
            catch
            {
                //log here     
            }
            finally
            {
                request = func();    
            }
        }
        return result;
    }

Теперь мы увидели, что когда у нас более 10 списков в ListOfLists (поэтому только 10 выполняются в любое время, а остальные ждут) и DoSomethingWithRequest прерывается несколько раз с перерывами, некоторые вызовы SomeFunction дублируются, а некоторые списки идентификаторов отбрасываются. Есть ли что-то в коде выше, что вызывает это?

Прошу прощения за не столь описательный заголовок.

Спасибо, Сид.

Редактировать:

    private HttpRequestMessage CreateRequest(HttpRequestType request, List<string> ids)
    {
        if (request == null) return null;
        request.SomeProperty = toList;
        return ConvertoToHttpRequestMessage(request); //This just does some serialization and adds a fresh request Id and headers
    }

Ответы [ 2 ]

2 голосов
/ 16 января 2020

Похоже, что у вас есть состояние гонки с экземпляром request, который вы передаете функции Execute.

// The request instance must be created before this line, and you're passing the same
// instance to each call of Execute.
ListOfLists.Select(ids => Observable.FromAsync(() => Execute(request, ids))).Merge(10);

CreateRequest не создает новый экземпляр HttpRequestType , он просто изменяет тот, который передается Execute. И поскольку каждый поток Execute работает с одним и тем же экземпляром HttpRequestType, они просто перезаписывают друг друга.

Итак, может происходить что-то подобное:

Thread A Запускается и параметр ids равен [1, 2, 3]. Thread A вводит SomeFunction и звонит func. Параметр toList, полученный для Thread A, равен [1, 2, 3], поэтому request.SomeProperty устанавливается на [1, 2, 3], затем HttpRequest создается с [1, 2, 3] в заголовке где-то и возвращается к SomeFunction. DoSomethingWithRequestFails для Thread A.

В то же время, Thread B запущен. Параметр ids равен [4, 5, 6]. Thread B вводит SomeFunction и звонит func. Thread B устанавливает request.SomeProperty на [4, 5, 6], затем вызывает ConvertToHttpRequestMessage.

Теперь, прежде чем Thread B сможет создать HttpRequest, Thread A (который не удался), входит в блок finally в SomeFunction и снова вызывает func. Thread A устанавливает request.SomeProperty обратно на [1, 2, 3], и поскольку Thread A и Thread B оба мутируют в одном и том же экземпляре HttpRequestType, Thread B теперь имеет [1, 2, 3] в request.SomeProperty.

И Thread A, и Thread B создают HttpRequest с [1, 2, 3] в заголовке. Список идентификаторов [1, 2, 3] дублируется, а список [4, 5, 6] никогда не отправляется.

Попробуйте передать toList из CreateRequest в ConvertToHttpRequestMessage вместо просто HttpRequestType или создать новый экземпляр HttpRequestType для каждого вызова Execute.

0 голосов
/ 16 января 2020

Я не вижу, где вы начинаете параллельное выполнение.

Если вы используете System.Parallel, вы можете отправить ParallelOptions. Вот пример вызова Parallel.ForEach()

List<string> myList = new List<string>();

Parallel.ForEach(
    myList,
    new ParallelOptions()
    {
        MaxDegreeOfParallelism = 1337 // Here we allow 1337 parallel executions
    },
    (i) => { /* do something */ });
...