Хорошо, извините, у меня мало времени, но вот общая идея / основа того, что я думал в качестве альтернативы.
Если честно, хотя я думаю, что ActionBlock<T>
- лучший вариант, поскольку для вас так много сделано, единственное ограничение - вы не можете динамически масштабировать объем работы, который он будет выполнять.это когда-то, хотя я думаю, что предел может быть довольно высоким.Если вы начнете делать это таким образом, у вас будет больше контроля или просто будет запущено какое-то динамическое количество задач, но вам придется много чего делать вручную, например, если вы хотите ограничить количество задач, выполняемых приВремя, вам придется внедрить систему очередей (что-то, что ActionBlock обрабатывает для вас), а затем поддерживать ее.Я думаю, это зависит от того, сколько сообщений вы получаете и как быстро ваш процесс обрабатывает их.
Вам придется проверить это и подумать о том, как это может применяться к вашему прямому варианту использования, так как я думаю, что некоторыеобласти деталей, немного схематично реализованной на моей стороне вокруг идеи параллельного пакета.
Таким образом, идея, которую я создал здесь, заключается в том, что вы можете запускать любое количество задач или добавлять к выполняющимся задачам илиОтмените задачи индивидуально, используя коллекцию.
Главное, я думаю, это просто заставить метод, который Callback запускает, запускать поток, который выполняет работу, вместо подписки в отдельном потоке.
Я использовал Task.Factory.StartNew
, как и вы, но сохранил возвращенный объект Task в объекте (TaskInfo
), который также имеет свой CancellationTokenSource, его Id (назначенный извне) в качестве свойств, а затем добавил это кколлекция TaskInfo
, которая является свойством класса, все это является частью:
Обновлено - чтобы избежать путаницы, я только что обновил код, который был здесь ранее.
Вам придется обновить биты и заполнить пробелы в таких местах, как все, что у вас есть для моего HeartbeatController
, и нескольких событиях, которые вызываются, потому что они выходят за рамкивопрос, но идея была бы той же.
public class TaskContainer
{
private ConcurrentBag<TaskInfo> Tasks;
public TaskContainer(){
Tasks = new ConcurrentBag<TaskInfo>();
}
//entry point
//UPDATED
public void StartAndMonitor(int processorCount)
{
for (int i = 0; i <= processorCount; i++)
{
Processor task = new Processor(ProcessorId = i);
CreateProcessorTask(task);
}
this.IsRunning = true;
MonitorTasks();
}
private void CreateProcessorTask(Processor processor)
{
CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
Task taskInstance = Task.Factory.StartNew(
() => processor.Start(cancellationTokenSource.Token)
);
//bind status update event
processor.ProcessorStatusUpdated += ReportProcessorProcess;
Tasks.Add(new ProcessorInfo()
{
ProcessorId = processor.ProcessorId,
Task = taskInstance,
CancellationTokenSource = cancellationTokenSource
});
}
//this method gets called once but the HeartbeatController gets an action as a param that it then
//executes on a timer. I haven't included that but you get the idea
//This method also checks for tasks that have stopped and restarts them if the manifest call says they should be running.
//Will also start any new tasks included in the manifest and stop any that aren't included in the manifest.
internal void MonitorTasks()
{
HeartbeatController.Beat(() =>
{
HeartBeatHappened?.Invoke(this, null);
List<int> tasksToStart = new List<int>();
//this is an api call or whatever drives your config that says what tasks must be running.
var newManifest = this.GetManifest(Properties.Settings.Default.ResourceId);
//task Removed Check - If a Processor is removed from the task pool, cancel it if running and remove it from the Tasks List.
List<int> instanceIds = new List<int>();
newManifest.Processors.ForEach(x => instanceIds.Add(x.ProcessorId));
var removed = Tasks.Select(x => x.ProcessorId).ToList().Except(instanceIds).ToList();
if (removed.Count() > 0)
{
foreach (var extaskId in removed)
{
var task = Tasks.FirstOrDefault(x => x.ProcessorId == extaskId);
task.CancellationTokenSource?.Cancel();
}
}
foreach (var newtask in newManifest.Processors)
{
var oldtask = Tasks.FirstOrDefault(x => x.ProcessorId == newtask.ProcessorId);
//Existing task check
if (oldtask != null && oldtask.Task != null)
{
if (!oldtask.Task.IsCanceled && (oldtask.Task.IsCompleted || oldtask.Task.IsFaulted))
{
var ex = oldtask.Task.Exception;
tasksToStart.Add(oldtask.ProcessorId);
continue;
}
}
else //New task Check
tasksToStart.Add(newtask.ProcessorId);
}
foreach (var item in tasksToStart)
{
var taskToRemove = Tasks.FirstOrDefault(x => x.ProcessorId == item);
if (taskToRemove != null)
Tasks.Remove(taskToRemove);
var task = newManifest.Processors.FirstOrDefault(x => x.ProcessorId == item);
if (task != null)
{
CreateProcessorTask(task);
}
}
});
}
}
//UPDATED
public class Processor{
private int ProcessorId;
private Subsriber<Message> subsriber;
public Processor(int processorId) => ProcessorId = processorId;
public void Start(CancellationToken token)
{
Subsriber<Message> subsriber = new Subsriber<Message>()
{
Interval = 1000
};
subsriber.Callback(Process, m => m != null);
}
private void Process()
{
//do work
}
}
Надеюсь, это даст вам представление о том, как еще вы можете подойти к вашей проблеме, и что я не упустил из виду:).
Обновление
Чтобы использовать события для обновления хода выполнения или задач, которые я обрабатываю, я бы извлек их в свой собственный класс, в котором затем есть методы подписки, а при создании нового экземпляра этого класса назначьте событиек обработчику в родительском классе, который затем может обновить ваш пользовательский интерфейс или все, что вы хотите, чтобы он делал с этой информацией.
Таким образом, содержимое Process()
будет выглядеть примерно так:
Processor processor = new Processor();
Task task = Task.Factory.StartNew(() => processor.ProcessMessage(cancellationTokenSource.CancellationToken));
processor.StatusUpdated += ReportProcess;