C # Многоразовые или постоянные задачи, которые ведут себя как потоки - PullRequest
1 голос
/ 23 мая 2019

С помощью потоков вы можете создавать постоянные, многократно используемые локальные переменные, которые полезны для таких вещей, как клиентские подключения. Тем не менее, с такими задачами, как ActionBlock из System.Threading.Tasks.Dataflow, кажется, что нет никакого вида сохранения или повторного использования блока действия. Так что для ActionBlock, который предполагает взаимодействие с клиентом, я понимаю, что вам нужно либо инициализировать клиентское соединение с нуля, либо повторно использовать его в более широкой области (с блокировкой?).

Вариант использования: я использую библиотеку .NET, которая инвертирует управление. Основная часть логики (кроме запуска и выключения) должна находиться в одном методе Task с именем ProcessEventsAsync, вызываемом библиотекой, который получает IEnumerable данных. ProcessEventsAsync должен выполнить некоторую обработку всех данных, а затем отправить их некоторым нижестоящим потребителям. Чтобы повысить производительность, я пытаюсь распараллелить логику в ProcessEventsAsync, используя Задачи. Я также хочу собрать некоторые показатели производительности из этой задачи.

Позвольте мне привести подробный пример того, что я делаю:

internal class MyClass
{

  private String firstDownStreamConnectionString;
  private String secondDownStreamConnectionString;
  private SomeClient firstClient;
  private SomeClient secondClient;
  private ReportingClient reportingClient;
  private int totalUnhandledDataCount;

  public MyClass(String firstDownStreamConnectionString, String secondDownStreamConnectionString, String reportingClientKey)
  {
      this.firstDownStreamConnectionString = firstDownStreamConnectionString;
      this.secondDownStreamConnectionString = secondDownStreamConnectionString;
      this.DegreeOfParallelism = Math.Max(Environment.ProcessorCount - 1, 1);
      this.reportingClient = new ReportingClient (reportingClientKey, DegreeOfParallelism);
      this.totalUnhandledDataCount = 0;
  }
  // called once when the framework signals that processing is about to be ready
  public override async Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
  {
    this.firstClient = SomeClient.CreateFromConnectionString(this.firstDownStreamConnectionString);
    this.secondClient = SomeClient.CreateFromConnectionString(this.secondDownStreamConnectionString );
    await Task.Yield();
  }

  // this is called repeatedly by the framework
  // outside of startup and shutdown, it is the only entrypoint to my logic
  public override async Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<Data> inputData)
  {
    ActionBlock<List<Data>> processorActionBlock = new ActionBlock<List<Data>>(
      inputData =>
      {
        SomeData firstDataset = new SomeData();
        SomeData secondDataset = new SomeData();
        int unhandledDataCount = 0;
        foreach (Data data in inputData)
        {
          // if data fits one set of criteria, put it in firstDataSet
          // if data fits other set of criteria, put it in secondDataSet
          // otherwise increment unhandledDataCount
        }
        Interlocked.Add(ref this.totalUnhandledDataCount, unhandledDataCount);
        lock (this.firstClient)
        {
          try
          {
            firstDataset.SendData(this.firstClient);
          } catch (Exception e)
          {
            lock(this.reportingClient)
            {
              this.reportingClient.LogTrace(e);
            }
          }
        }
        lock (this.secondClient)
        {
          try
          {
            secondDataset.SendData(this.secondClient);
          } catch (Exception e)
          {
            lock(this.reportingClient)
            {
              this.reportingClient.LogTrace(e);
            }
          }
        }
      },
      new ExecutionDataflowBlockOptions
      {
        MaxDegreeOfParallelism = this.DegreeOfParallelism
      });
    // construct as many List<Data> from inputData as there is DegreeOfParallelism
    // put that in a variable called batches
    for(int i = 0; i < DegreeOfParallelism; i++)
    {
      processorActionBlock.Post(batches[i]);
    }
    processorActionBlock.Complete();
    processorActionBlock.Completion.Wait();
    await context.CheckpointAsync();
  }
}

Я пытался сохранить это только для соответствующего кода, я опустил логику обработки, большую часть сбора метрик, способ отправки данных, логику выключения и т. Д.

Я хочу использовать некоторую разновидность Task, которая допускает повторное использование. Я не хочу повторно использовать одно клиентское соединение для всех запущенных Задач этого типа, и при этом я не хочу, чтобы каждая Задача создавала новое клиентское соединение каждый раз, когда оно вызывается. Я хочу, чтобы у каждой задачи типа Thread имелся постоянный набор клиентских подключений. В идеале я также не хочу создавать новый класс, который переносит задачу или расширяет абстрактный класс / интерфейс в System.Threading.Tasks.Dataflow.

Ответы [ 2 ]

0 голосов
/ 23 мая 2019

Похоже, вам просто нужен класс, который хранит зависимости?

void Main()
{
    var doer1 = new ThingDoer();
    var doer2 = new ThingDoer();

    // A & B use one pair of clients, and C & D use another pair
    var taskA = doer1.DoTheThing();
    var taskB = doer1.DoTheThing();

    var taskC = doer2.DoTheThing();
    var taskD = doer2.DoTheThing();
}

public class ThingDoer
{
    private SomeClient _someClient;
    private SomeErrorReportingClient _someErrorReportingClient;

    public ThingDoer(SomeClient someClient, SomeErrorReportingClient someErrorReportingClient)
    {
        _someClient = someClient;
        _someErrorReportingClient = someErrorReportingClient;
    }

    public ThingDoer()
        : this(new SomeClient, new SomeErrorReportingClient)
    {
    }

    public async Task DoTheThing()
    {
        // Implementation here
    }
}

Понятие "повторное использование" не совсем совместимо с задачами.

0 голосов
/ 23 мая 2019

То, что вы описываете, звучит как асинхронный делегат или Func.

Например:

Func<Task> TestFunc = async () =>
{
    Console.WriteLine("Begin");
    await Task.Delay(100);
    Console.WriteLine("Delay");
    await Task.Delay(100);
    Console.WriteLine("End");
};

Если функция находится в области действия, вам просто нужно:

await TestFunc();

Вы можете использовать его столько раз, сколько вам нужно.Вы также можете изменить функцию для приема параметров.


Редактировать

Вы также можете попробовать AsyncLocal .Согласно документации:

Поскольку модель асинхронного программирования на основе задач стремится абстрагировать использование потоков, экземпляры AsyncLocal могут использоваться для сохранения данных в потоках.

Класс AsyncLocal такжепредоставляет необязательные уведомления при изменении значения, связанного с текущим потоком, либо потому, что оно было явно изменено путем установки свойства Value, либо неявно изменено, когда поток обнаружил ожидающий или другой контекстный переход.

...