Как снизить нагрузку на процессор в многопоточных отправителях уведомлений / сообщений на основе БД - PullRequest
0 голосов
/ 13 декабря 2018

Я пытаюсь разработать службу Windows для отправки уведомлений подпискам.Данные сохраняются в базе данных сервера SQL.

Уведомления создаются путем отправки веб-запроса POST к конечной точке API REST и сохраняются в таблице базы данных.

Служба запускает одну задачу, которая хранитчтение уведомлений из этой таблицы БД и добавление их в очередь.

Также служба запускает несколько задач, которые продолжают чтение из очереди и выполняют фактический процесс отправки.

Код работает хорошо ивыполнение необходимой работы, но проблема в том, что при запуске службы загрузка процессора составляет 100%.

Я пытался использовать Thread.Sleep или Task.Delay, но ни одна из них не помогла мне уменьшить загрузку процессора.

Я прочитал в этом кодовом проекте страницу , что мне нужно использовать обработчики ожидания и должен ждать при некоторых условиях.Я не мог заставить это работать должным образом.

, поэтому кто-нибудь может посоветовать, что я могу сделать, чтобы уменьшить загрузку процессора для EnqueueTask и DequeueTask?

Вот код отправителя:

static class NotificationSender
{
    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
    static Task enqueueTask = null;
    static Task[] dequeueTasks = null;

    public static void StartSending(ServiceState serviceState)
    {
        PushService.InitServices();

        enqueueTask = Task.Factory.StartNew(EnqueueTask, serviceState);

        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();

        int dequeueTasksCount = 10;
        dequeueTasks = new Task[dequeueTasksCount];
        for (int i = 0; i < dequeueTasksCount; i++)
        {
            dequeueTasks[i] = Task.Factory.StartNew(DequeueTask, serviceState);
        }
    }

    public static void EnqueueTask(object state)
    {
        ServiceState serviceState = (ServiceState)state;

        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
        {
            while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
            {
                int toEnqueue = 100 - deliveryQueue.Count;

                if (toEnqueue > 0)
                {
                    // fetch some records from db to be enqueued
                    NotificationDelivery[] deliveries = db.NotificationDeliveries
                        .Include("Subscription")
                        .Include("Notification")
                        .Include("Notification.NotificationLanguages")
                        .Include("Notification.NotificationLanguages.Language")
                        .Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
                        .OrderBy(nd => nd.StartSendingAt)
                        .Take(toEnqueue)
                        .ToArray();

                    foreach (NotificationDelivery delivery in deliveries)
                    {
                        delivery.Status = NotificationDeliveryStatus.Queued;
                        deliveryQueue.Enqueue(delivery);
                    }

                    if (deliveries.Length > 0)
                    {
                        db.SaveChanges(); // save Queued state, so not fetched again the next loop
                    }
                }

                // save any changes made by the DequeueTask
                // an event may be used here to know if any changes made
                db.SaveChanges();
            }

            Task.WaitAll(dequeueTasks);
            db.SaveChanges();
        }
    }

    public async static void DequeueTask(object state)
    {
        ServiceState serviceState = (ServiceState)state;

        while (!serviceState.CancellationTokenSource.Token.IsCancellationRequested)
        {
            NotificationDelivery delivery = null;

            if (deliveryQueue.TryDequeue(out delivery))
            {
                NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
                if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
                {
                    PushResult result = await PushService.DoPushAsync(delivery);

                    switch (result)
                    {
                        case PushResult.Pushed:
                            ns = NotificationDeliveryStatus.Delivered;
                            break;
                        case PushResult.Error:
                            ns = NotificationDeliveryStatus.FailureError;
                            break;
                        case PushResult.NotSupported:
                            ns = NotificationDeliveryStatus.FailureNotSupported;
                            break;
                        case PushResult.UnSubscribed:
                            ns = NotificationDeliveryStatus.FailureUnSubscribed;
                            delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
                            break;
                    }
                }
                else
                {
                    ns = NotificationDeliveryStatus.FailureUnSubscribed;
                }

                delivery.Status = ns;
                delivery.DeliveredAt = DateTime.Now;
            }
        }
    }

    public static void Wait()
    {
        Task.WaitAll(enqueueTask);
        Task.WaitAll(dequeueTasks);

        enqueueTask.Dispose();
        for(int i = 0; i < dequeueTasks.Length; i++)
        {
            dequeueTasks[i].Dispose();
        }
    }
}

Объект типа ServiceState используется для поддержки запуска и остановки службы, и вот код для этого типа:

class ServiceState
{
    public CancellationTokenSource CancellationTokenSource { get; set; }

    public void Start()
    {
        CancellationTokenSource = new CancellationTokenSource();

        NotificationSender.StartSending(this);
    }

    public void Stop()
    {
        CancellationTokenSource.Cancel();

        NotificationSender.Wait();
        CancellationTokenSource.Dispose();
    }
}

Вот код запуска и остановки службы:

protected override void OnStart(string[] args)
{
    _serviceState = new ServiceState();
    _serviceState.Start();
}

protected override void OnStop()
{
    _serviceState.Stop();
}

1 Ответ

0 голосов
/ 13 декабря 2018

Я думаю, что наконец смогу сделать хорошие изменения, чтобы сохранить загрузку процессора, используя обработчики ожидания и таймер.

EnqueueTask будет ждать 5 секунд, прежде чем снова попытаться извлечь данные из таблицы уведомлений, если уведомления не извлечены,Если никакие уведомления не получены, он запустит таймер и сбросит дескриптор ожидания.Обратный вызов по истечении таймера установит дескриптор ожидания.

Также DequeueTask теперь использует дескриптор ожидания.Если в очереди больше нет элементов, он сбросит дескриптор ожидания, чтобы прекратить удаление пустой очереди.EnqueueTask установит этот дескриптор ожидания при добавлении элементов в очередь.

Использование ЦП теперь <= 10% </p>

А вот обновленный NotificationSender код:

static class NotificationSender
{
    static ConcurrentQueue<NotificationDelivery> deliveryQueue = null;
    static Task enqueueTask = null;
    static Task[] dequeueTasks = null;

    static ManualResetEvent enqueueSignal = null;
    static ManualResetEvent dequeueSignal = null;

    static System.Timers.Timer enqueueTimer = null;

    public static void StartSending(CancellationToken token)
    {
        PushService.InitServices();

        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
        {
            NotificationDelivery[] queuedDeliveries = db.NotificationDeliveries
                        .Where(nd => nd.Status == NotificationDeliveryStatus.Queued)
                        .ToArray();

            foreach (NotificationDelivery delivery in queuedDeliveries)
            {
                delivery.Status = NotificationDeliveryStatus.Pending;
            }

            db.SaveChanges();
        }

        enqueueSignal = new ManualResetEvent(true);
        dequeueSignal = new ManualResetEvent(false);

        enqueueTimer = new System.Timers.Timer();
        enqueueTimer.Elapsed += EnqueueTimerCallback;
        enqueueTimer.Interval = 5000;
        enqueueTimer.AutoReset = false;
        enqueueTimer.Stop();

        enqueueTask = new Task(EnqueueTask, token, TaskCreationOptions.LongRunning);
        enqueueTask.Start();

        deliveryQueue = new ConcurrentQueue<NotificationDelivery>();

        int dequeueTasksCount = 10;
        dequeueTasks = new Task[dequeueTasksCount];
        for (int i = 0; i < dequeueTasksCount; i++)
        {
            dequeueTasks[i] = new Task(DequeueTask, token, TaskCreationOptions.LongRunning);
            dequeueTasks[i].Start();
        }
    }

    public static void EnqueueTimerCallback(Object source, ElapsedEventArgs e)
    {
        enqueueSignal.Set();
        enqueueTimer.Stop();
    }

    public static void EnqueueTask(object state)
    {
        CancellationToken token = (CancellationToken)state;

        using (DSTeckWebPushNotificationsContext db = new DSTeckWebPushNotificationsContext())
        {
            while (!token.IsCancellationRequested)
            {
                if (enqueueSignal.WaitOne())
                {
                    int toEnqueue = 100 - deliveryQueue.Count;

                    if (toEnqueue > 0)
                    {
                        // fetch some records from db to be enqueued
                        NotificationDelivery[] deliveries = db.NotificationDeliveries
                            .Include("Subscription")
                            .Include("Notification")
                            .Include("Notification.NotificationLanguages")
                            .Include("Notification.NotificationLanguages.Language")
                            .Where(nd => nd.Status == NotificationDeliveryStatus.Pending && DateTime.Now >= nd.StartSendingAt)
                            .OrderBy(nd => nd.StartSendingAt)
                            .Take(toEnqueue)
                            .ToArray();

                        foreach (NotificationDelivery delivery in deliveries)
                        {
                            delivery.Status = NotificationDeliveryStatus.Queued;
                            deliveryQueue.Enqueue(delivery);
                        }

                        if (deliveries.Length > 0)
                        {
                            // save Queued state, so not fetched again the next loop
                            db.SaveChanges();

                            // signal the DequeueTask
                            dequeueSignal.Set();
                        }
                        else
                        {
                            // no more notifications, wait 5 seconds before try fetching again
                            enqueueSignal.Reset();
                            enqueueTimer.Start();
                        }
                    }

                    // save any changes made by the DequeueTask
                    // an event may be used here to know if any changes made
                    db.SaveChanges();
                }
            }

            Task.WaitAll(dequeueTasks);
            db.SaveChanges();
        }
    }

    public async static void DequeueTask(object state)
    {
        CancellationToken token = (CancellationToken)state;

        while (!token.IsCancellationRequested)
        {
            if (dequeueSignal.WaitOne()) // block untill we have items in the queue
            {
                NotificationDelivery delivery = null;

                if (deliveryQueue.TryDequeue(out delivery))
                {
                    NotificationDeliveryStatus ns = NotificationDeliveryStatus.Pending;
                    if (delivery.Subscription.Status == SubscriptionStatus.Subscribed)
                    {
                        PushResult result = await PushService.DoPushAsync(delivery);

                        switch (result)
                        {
                            case PushResult.Pushed:
                                ns = NotificationDeliveryStatus.Delivered;
                                break;
                            case PushResult.Error:
                                ns = NotificationDeliveryStatus.FailureError;
                                break;
                            case PushResult.NotSupported:
                                ns = NotificationDeliveryStatus.FailureNotSupported;
                                break;
                            case PushResult.UnSubscribed:
                                ns = NotificationDeliveryStatus.FailureUnSubscribed;
                                delivery.Subscription.Status = SubscriptionStatus.UnSubscribed;
                                break;
                        }
                    }
                    else
                    {
                        ns = NotificationDeliveryStatus.FailureUnSubscribed;
                    }

                    delivery.Status = ns;
                    delivery.DeliveredAt = DateTime.Now;
                }
                else
                {
                    // empty queue, no more items
                    // stop dequeueing untill new items added by EnqueueTask
                    dequeueSignal.Reset();
                }
            }
        }
    }

    public static void Wait()
    {
        Task.WaitAll(enqueueTask);
        Task.WaitAll(dequeueTasks);

        enqueueTask.Dispose();
        for(int i = 0; i < dequeueTasks.Length; i++)
        {
            dequeueTasks[i].Dispose();
        }
    }
}
...