Я пытаюсь разработать службу Windows для отправки уведомлений подпискам.Данные сохраняются в базе данных сервера SQL.
Уведомления создаются путем отправки веб-запроса POST к конечной точке API REST и сохраняются в таблице базы данных.
Служба запускает одну задачу, которая хранитчтение уведомлений из этой таблицы БД и добавление их в очередь.
Также служба запускает несколько задач, которые продолжают чтение из очереди и выполняют фактический процесс отправки.
Код работает хорошо ивыполнение необходимой работы, но проблема в том, что при запуске службы загрузка процессора составляет 100%.
Я пытался использовать Thread.Sleep или Task.Delay, но ни одна из них не помогла мне уменьшить загрузку процессора.
Я прочитал в этом кодовом проекте страницу , что мне нужно использовать обработчики ожидания и должен ждать при некоторых условиях.Я не мог заставить это работать должным образом.
, поэтому кто-нибудь может посоветовать, что я могу сделать, чтобы уменьшить загрузку процессора для EnqueueTask
и DequeueTask
?
Вот код отправителя:
static class NotificationSender
{
static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
static Task enqueueTask = null;
static Task[] dequeueTasks = null;
public static void StartSending(ServiceState serviceState)
{
PushService.InitServices();
enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);
deliveryQueue = new ConcurrentQueue<NotificationDelivery>();
int dequeueTasksCount = 10;
dequeueTasks = new Task[dequeueTasksCount];
for (int i = 0; i < dequeueTasksCount; i++)
{
dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);
}
}
public static void EnqueueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
{
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
int toEnqueue = 100 - deliveryQueue.Count;
if (toEnqueue > 0)
{
// fetch some records from db to be enqueued
NotificationDelivery[] deliveries = db.NotificationDeliveries
.Include("Subscription")
.Include("Notification")
.Include("Notification.NotificationLanguages")
.Include("Notification.NotificationLanguages.Language")
.Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
.OrderBy(nd => nd.StartSendingAt)
.Take(toEnqueue)
.ToArray();
foreach (NotificationDelivery delivery in deliveries)
{
delivery.Status = NotificationDeliveryStatus.Queued;
deliveryQueue.Enqueue(delivery);
}
if (deliveries.Length > 0)
{
db.SaveChanges(); // save Queued state, so not fetched again the next loop
}
}
// save any changes made by the DequeueTask
// an event may be used here to know if any changes made
db.SaveChanges();
}
Task.WaitAll(dequeueTasks);
db.SaveChanges();
}
}
public async static void DequeueTask(object state)
{
ServiceState serviceState = (ServiceState)state;
while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
{
NotificationDelivery delivery = null;
if (deliveryQueue.TryDequeue(out delivery))
{
NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
{
PushResult result = await PushService.DoPushAsync(delivery);
switch (result)
{
case PushResult.Pushed:
ns = NotificationDeliveryStatus.Delivered;
break;
case PushResult.Error:
ns = NotificationDeliveryStatus.FailureError;
break;
case PushResult.NotSupported:
ns = NotificationDeliveryStatus.FailureNotSupported;
break;
case PushResult.UnSubscribed:
ns = NotificationDeliveryStatus.FailureUnSubscribed;
delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
break;
}
}
else
{
ns = NotificationDeliveryStatus.FailureUnSubscribed;
}
delivery.Status = ns;
delivery.DeliveredAt = DateTime.Now;
}
}
}
public static void Wait()
{
Task.WaitAll(enqueueTask);
Task.WaitAll(dequeueTasks);
enqueueTask.Dispose();
for(int i = 0; i < dequeueTasks.Length; i++)
{
dequeueTasks[i].Dispose();
}
}
}
Объект типа ServiceState
используется для поддержки запуска и остановки службы, и вот код для этого типа:
class ServiceState
{
public CancellationTokenSource CancellationTokenSource { get; set; }
public void Start()
{
CancellationTokenSource = new CancellationTokenSource();
NotificationSender.StartSending(this);
}
public void Stop()
{
CancellationTokenSource.Cancel();
NotificationSender.Wait();
CancellationTokenSource.Dispose();
}
}
Вот код запуска и остановки службы:
protected override void OnStart(string[] args)
{
_serviceState = new ServiceState();
_serviceState.Start();
}
protected override void OnStop()
{
_serviceState.Stop();
}