Есть ли способ регулирования потоков, используемых библиотекой параллельных задач? - PullRequest
4 голосов
/ 23 августа 2010

Я использую TPL, но нахожу хитрым код модульного тестирования, который его использует.

Я пытаюсь не вводить оболочку поверх него, как я чувствуюможет вызвать проблемы.

Я понимаю, что вы можете установить привязку процессора в TPL, но было бы неплохо установить максимум потока (вероятно, для домена приложения).Следовательно, при установке максимального значения потока в 1 TPL будет вынужден использовать любой поток, в котором он использовался.

Что вы думаете?Возможно ли это (я уверен, что это не так), и должно это возможно?

Редактировать: вот пример

public class Foo
{
    public Foo( )
    {
        Task.Factory.StartNew( () => somethingLong( ) )
            .ContinueWith( a => Bar = 1 ) ;
    }
}

[Test] public void Foo_should_set_Bar_to_1( )
{
    Assert.Equal(1, new Foo( ).Bar ) ;
}

Тест , вероятно, не пройдет, если я не введу задержку.Я хотел бы иметь что-то вроде Task.MaximumThreads=1, чтобы TPL запускался последовательно.

Ответы [ 3 ]

4 голосов
/ 02 сентября 2010

Вы можете создать свой собственный класс TaskScheduler, производный от TaskScheduler, передать его в TaskFactory. Теперь вы можете иметь любые Task объекты, которые вы создаете, запускать с этим планировщиком.

Нет необходимости настраивать его на использование одного потока.

Тогда, прямо перед вашими утверждениями, просто наберите Dispose(). Внутренне это будет происходить примерно так, если вы будете следовать образцам для записи TaskScheduler: -

public void Dispose()
{
    if (tasks != null)
    {
        tasks.CompleteAdding();

        foreach (var thread in threads) thread.Join();

        tasks.Dispose();
        tasks = null;
    }
}

Это будет гарантировать, что все задачи были выполнены. Теперь вы можете двигаться вперед с вашими утверждениями.

Вы также можете использовать ContinueWith(...) для добавления утверждений после выполнения Задачи, если хотите проверить прогресс по мере происходящего.

2 голосов
/ 02 сентября 2010

Действительно, это больше проблема с тестируемостью лямбда-тяжелого кода, чем с TPL. Предложение Hightechrider является хорошим, но по сути ваши тесты все еще тестируют TPL так же, как и ваш код. Вам не нужно проверять это, когда первая задача заканчивается и ContinueWith запускает следующую задачу.

Если код внутри ваших лямбд значительно больше, то его использование в более тестируемом методе с четко определенными параметрами может сделать код более легким для чтения и более тестируемым. Вы можете написать модульные тесты вокруг этого. Там, где это возможно, я пытаюсь ограничить или убрать параллелизм из моих модульных тестов.

Сказав, что я хотел посмотреть, сработает ли подход планировщика. Вот реализация с использованием модифицированного StaTaskScheduler из http://code.msdn.microsoft.com/ParExtSamples

    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    using Xunit;

    namespace Example
    {
      public class Foo
      {
        private TaskScheduler _scheduler;

    public int Bar { get; set; }

    private void SomethingLong()
    {
      Thread.SpinWait(10000);
    }

    public Foo()
      : this(TaskScheduler.Default)
    {
    }

    public Foo(TaskScheduler scheduler)
    {
      _scheduler = scheduler;
    }

    public void DoWork()
    {
      var factory = new TaskFactory(_scheduler);

      factory.StartNew(() => SomethingLong())
      .ContinueWith(a => Bar = 1, _scheduler);
    }
  }

  public class FooTests
  {
    [Fact]
    public void Foo_should_set_Bar_to_1()
    {
      var sch = new StaTaskScheduler(3);
      var target = new Foo(sch);
      target.DoWork();

      sch.Dispose();
      Assert.Equal(1, target.Bar);
    }
  }

  public sealed class StaTaskScheduler : TaskScheduler, IDisposable
  {
    /// <summary>Stores the queued tasks to be executed by our pool of STA threads.</summary>
    private BlockingCollection<Task> _tasks;
    /// <summary>The STA threads used by the scheduler.</summary>
    private readonly List<Thread> _threads;

    /// <summary>Initializes a new instance of the StaTaskScheduler class with the specified concurrency level.</summary>
    /// <param name="numberOfThreads">The number of threads that should be created and used by this scheduler.</param>
    public StaTaskScheduler(int numberOfThreads)
    {
      // Validate arguments
      if (numberOfThreads < 1) throw new ArgumentOutOfRangeException("concurrencyLevel");

      // Initialize the tasks collection
      _tasks = new BlockingCollection<Task>();

      // Create the threads to be used by this scheduler
      _threads = Enumerable.Range(0, numberOfThreads).Select(i =>
      {
        var thread = new Thread(() =>
        {
          // Continually get the next task and try to execute it.
          // This will continue until the scheduler is disposed and no more tasks remain.
          foreach (var t in _tasks.GetConsumingEnumerable())
          {
            TryExecuteTask(t);
          }
        });
        thread.IsBackground = true;
        // NO STA REQUIREMENT!
        // thread.SetApartmentState(ApartmentState.STA);
        return thread;
      }).ToList();

      // Start all of the threads
      _threads.ForEach(t => t.Start());
    }

    /// <summary>Queues a Task to be executed by this scheduler.</summary>
    /// <param name="task">The task to be executed.</param>
    protected override void QueueTask(Task task)
    {
      // Push it into the blocking collection of tasks
      _tasks.Add(task);
    }

    /// <summary>Provides a list of the scheduled tasks for the debugger to consume.</summary>
    /// <returns>An enumerable of all tasks currently scheduled.</returns>
    protected override IEnumerable<Task> GetScheduledTasks()
    {
      // Serialize the contents of the blocking collection of tasks for the debugger
      return _tasks.ToArray();
    }

    /// <summary>Determines whether a Task may be inlined.</summary>
    /// <param name="task">The task to be executed.</param>
    /// <param name="taskWasPreviouslyQueued">Whether the task was previously queued.</param>
    /// <returns>true if the task was successfully inlined; otherwise, false.</returns>
    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
      // Try to inline if the current thread is STA
      return
      Thread.CurrentThread.GetApartmentState() == ApartmentState.STA &&
      TryExecuteTask(task);
    }

    /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
    public override int MaximumConcurrencyLevel
    {
      get { return _threads.Count; }
    }

    /// <summary>
    /// Cleans up the scheduler by indicating that no more tasks will be queued.
    /// This method blocks until all threads successfully shutdown.
    /// </summary>
    public void Dispose()
    {
      if (_tasks != null)
      {
        // Indicate that no new tasks will be coming in
        _tasks.CompleteAdding();

        // Wait for all threads to finish processing tasks
        foreach (var thread in _threads) thread.Join();

        // Cleanup
        _tasks.Dispose();
        _tasks = null;
      }
    }
  }
}
1 голос
/ 11 октября 2010

Если вы хотите избавиться от необходимости перегружать конструктор, вы можете заключить код модульного теста в Task.Factory.ContinueWhenAll (...).

public class Foo
{
    public Foo( )
    {
        Task.Factory.StartNew( () => somethingLong( ) )
            .ContinueWith( a => Bar = 1 ) ;
    }
}

[Test] public void Foo_should_set_Bar_to_1( )
{
    Foo foo;
    Task.Factory.ContinueWhenAll(
        new [] {
            new Task(() => {
                foo = new Foo();
            })
        },
        asserts => { 
            Assert.Equal(1, foo.Bar ) ;
        }
    ).Wait;
}

Хотелось бы услышать отзывы об этом подходе.

...