С помощью потоков вы можете создавать постоянные, многократно используемые локальные переменные, которые полезны для таких вещей, как клиентские подключения. Тем не менее, с такими задачами, как ActionBlock из System.Threading.Tasks.Dataflow, кажется, что нет никакого вида сохранения или повторного использования блока действия. Так что для ActionBlock, который предполагает взаимодействие с клиентом, я понимаю, что вам нужно либо инициализировать клиентское соединение с нуля, либо повторно использовать его в более широкой области (с блокировкой?).
Вариант использования: я использую библиотеку .NET, которая инвертирует управление. Основная часть логики (кроме запуска и выключения) должна находиться в одном методе Task с именем ProcessEventsAsync, вызываемом библиотекой, который получает IEnumerable данных. ProcessEventsAsync должен выполнить некоторую обработку всех данных, а затем отправить их некоторым нижестоящим потребителям. Чтобы повысить производительность, я пытаюсь распараллелить логику в ProcessEventsAsync, используя Задачи. Я также хочу собрать некоторые показатели производительности из этой задачи.
Позвольте мне привести подробный пример того, что я делаю:
internal class MyClass
{
private String firstDownStreamConnectionString;
private String secondDownStreamConnectionString;
private SomeClient firstClient;
private SomeClient secondClient;
private ReportingClient reportingClient;
private int totalUnhandledDataCount;
public MyClass(String firstDownStreamConnectionString, String secondDownStreamConnectionString, String reportingClientKey)
{
this.firstDownStreamConnectionString = firstDownStreamConnectionString;
this.secondDownStreamConnectionString = secondDownStreamConnectionString;
this.DegreeOfParallelism = Math.Max(Environment.ProcessorCount - 1, 1);
this.reportingClient = new ReportingClient (reportingClientKey, DegreeOfParallelism);
this.totalUnhandledDataCount = 0;
}
// called once when the framework signals that processing is about to be ready
public override async Task OpenAsync(CancellationToken cancellationToken, PartitionContext context)
{
this.firstClient = SomeClient.CreateFromConnectionString(this.firstDownStreamConnectionString);
this.secondClient = SomeClient.CreateFromConnectionString(this.secondDownStreamConnectionString );
await Task.Yield();
}
// this is called repeatedly by the framework
// outside of startup and shutdown, it is the only entrypoint to my logic
public override async Task ProcessEventsAsync(CancellationToken cancellationToken, PartitionContext context, IEnumerable<Data> inputData)
{
ActionBlock<List<Data>> processorActionBlock = new ActionBlock<List<Data>>(
inputData =>
{
SomeData firstDataset = new SomeData();
SomeData secondDataset = new SomeData();
int unhandledDataCount = 0;
foreach (Data data in inputData)
{
// if data fits one set of criteria, put it in firstDataSet
// if data fits other set of criteria, put it in secondDataSet
// otherwise increment unhandledDataCount
}
Interlocked.Add(ref this.totalUnhandledDataCount, unhandledDataCount);
lock (this.firstClient)
{
try
{
firstDataset.SendData(this.firstClient);
} catch (Exception e)
{
lock(this.reportingClient)
{
this.reportingClient.LogTrace(e);
}
}
}
lock (this.secondClient)
{
try
{
secondDataset.SendData(this.secondClient);
} catch (Exception e)
{
lock(this.reportingClient)
{
this.reportingClient.LogTrace(e);
}
}
}
},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = this.DegreeOfParallelism
});
// construct as many List<Data> from inputData as there is DegreeOfParallelism
// put that in a variable called batches
for(int i = 0; i < DegreeOfParallelism; i++)
{
processorActionBlock.Post(batches[i]);
}
processorActionBlock.Complete();
processorActionBlock.Completion.Wait();
await context.CheckpointAsync();
}
}
Я пытался сохранить это только для соответствующего кода, я опустил логику обработки, большую часть сбора метрик, способ отправки данных, логику выключения и т. Д.
Я хочу использовать некоторую разновидность Task, которая допускает повторное использование. Я не хочу повторно использовать одно клиентское соединение для всех запущенных Задач этого типа, и при этом я не хочу, чтобы каждая Задача создавала новое клиентское соединение каждый раз, когда оно вызывается. Я хочу, чтобы у каждой задачи типа Thread имелся постоянный набор клиентских подключений. В идеале я также не хочу создавать новый класс, который переносит задачу или расширяет абстрактный класс / интерфейс в System.Threading.Tasks.Dataflow.