Как обрабатывать события в реальном времени, которые срабатывают до завершения обработки предыдущего события (C #) - PullRequest
2 голосов
/ 27 декабря 2011

Предположим, у нас есть прослушиватель события в реальном времени, который выполняет некоторый блок кода при инициировании события.

для нашего обсуждения, скажем, у нас есть класс MyTime, который имеет член currentTime.

мы настроили его так, чтобы при каждом изменении часов компьютера значение currentTime устанавливалось на значение текущего времени.Мы реализовали свойство измененного интерфейса INotifyPropertyChanged для нашего объекта currentTime:

public event PropertyChangedEventHandler PropertyChanged;


       public string currentTime
            {
                get { return _currentTime; }
                set { _currentTime= value; this.NotifyPropertyChanged("currentTime"); } 
             }


    public void NotifyPropertyChanged(object sender, PropertyChangedEventArgs e) {

    if (PropertyChanged != null)
                PropertyChanged(this, new PropertyChangedEventArgs(name));

    }

Какой-то другой класс, скажем, ProcessTime, прослушивает это событие:

TimeChanged += new PropertyChangedEventHandler(PropertyChanged};

, и у него есть функция, котораявыполнит что-то:

public void TimeChanged(object sender, PropertyChangedEventArgs e)
{

// Process lots of calculations

}

Поскольку часы нашего компьютера постоянно меняются, оно будет запускать событие.В моем понимании, когда произойдет первое изменение, мы выполним блок TimeChanged.Пока мы выполняем, мы будем продолжать получать все больше и больше уведомлений и обрабатывать их как можно быстрее, создавая длинную очередь событий, которые еще предстоит обработать.

Проблема заключается в том, что после обработки первого изменения времении перейдем к следующему изменению времени, «реальное время» уже далеко впереди, и все, что мы рассчитываем, мы рассчитываем на то, что произошло в прошлом.

То, что мы хотели бы сделать, это игнорироватьвсе новые события до тех пор, пока мы не закончим нашу первоначальную обработку, и только затем снова начнем прослушивать событие.

Настройка нескольких потоков не является опцией, поскольку она не решает проблему, и мы не хотим обрабатыватькаждый раз меняются только те, когда наши ресурсы высвобождаются.

Очевидно, я использовал изменение времени и приведенный выше код в качестве наглядного примера, но он демонстрирует кратко и адекватно (ИМХО) то, что мы пытаемсячтобы выполнить здесь.

Я хотел бы использовать какой-то буфер, но мойзнание здесь очень и очень ограничено.Спасибо

Спасибо за все ответы до сих пор.Начнем реализацию.Постараюсь задокументировать успехи / неудачи.

Ответы [ 4 ]

2 голосов
/ 27 декабря 2011

Ну, во-первых, рассматриваемое событие не вызывается асинхронно.Поэтому, если вы не устанавливаете время для постоянно меняющихся потоков, вызов для установки времени не будет возвращаться, и вы не установите его снова, пока все события не обработают его.Если вы хотите предотвратить эту проблему, вам нужно перенести обработку событий в другой поток.

В конечном счете, сложность ситуации и то, насколько точно вы хотите быть в реальном времени, могут предопределить окончательный ответ на этот вопрос.Но, предполагая, что вы хотите что-то достаточно надежное для относительно небольшого числа потоков (скажем, дюжины), вот примерно то, как я бы поступил в этом направлении.

private var _Callbacks = new List<PropertyChangedEventHandler>();

public event PropertyChangedEventHandler PropertyChanged
{
    add
    {
        lock(_Callbacks)
            _Callbacks.Add(value);

        Thread Worker = new Thread(PollTime);
        Worker.Background = true;
        Worker.Start(value);
    }
    remove
    {
        lock(_Callbacks)
            _Callbacks.Remove(value);
    }
}

private void PollTime(object callback)
{
    PropertyChangedEventHandler c = (PropertyChangedEventHandler)callback;
    string LastReported = null;

    while(true)
    {
        lock(_Callbacks)
            if (!_Callbacks.Contains(c))
                return;

        if (LastReported != _currentTime)
        {
            LastReported = _currentTime;
            c(this, new PropertyChangedEventArgs(name));
        }
        else
            Thread.Sleep(10);
    }
}

public string currentTime
{
    get { return _currentTime; }
    set { _currentTime= value; } 
}

С этим вы обеспечите безопасность потоков наваши события (в случае, если кто-то пытается подписаться / отписаться от них в неподходящее время), и каждый подписчик получает свой собственный поток для обработки обратных вызовов.Подписчики не получат все те же события, но они все будут уведомлены, когда время изменится.Более медленные не получат столько событий, потому что потеряют некоторые промежуточные значения.Это не будет уведомлять, если время сбрасывается без изменений, но я не вижу такой большой потери.Вы можете столкнуться с проблемами, если значения чередуются в ограниченном наборе, но со временем это не проблема.

Для получения дополнительной информации о делегатах, событиях и их работе есть очень длинная, но очень хорошая статья в http://www.sellsbrothers.com/writing/delegates.htm

2 голосов
/ 27 декабря 2011

Это был бы мой подход.

  1. Не позволяйте потребителю блокировать поток событий производителя.
  2. Создать облегченную «критическую секцию» (в основном, атомарную переменную условия), чтобы в один момент времени мог быть активен только один вызов обработчика потребителя.

Вот полный пример, который реализует эту логику. Существует EventProducer и EventConsumer. Они могут быть настроены, чтобы быть быстрее или медленнее, чем друг друга по мере необходимости. Производитель событий создает фоновый поток для создания событий. EventConsumer использует пользовательский класс CriticalSectionSlim с простым шаблоном TryEnter / Exit, чтобы избежать одновременного вызова кода обработки. Он также отправляет код обработки в пул потоков, используя поведение по умолчанию класса .NET 4.0 Task. Если возникает исключение, оно перебрасывается из основного потока обработчика при следующем вызове.

using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;

internal sealed class Program
{
    private static void Main(string[] args)
    {
        using (EventProducer producer = new EventProducer(TimeSpan.FromMilliseconds(250.0d)))
        using (EventConsumer consumer = new EventConsumer(producer, TimeSpan.FromSeconds(1.0d)))
        {
            Console.WriteLine("Press ENTER to stop.");
            Console.ReadLine();
        }

        Console.WriteLine("Done.");
    }

    private static class ConsoleLogger
    {
        public static void WriteLine(string message)
        {
            Console.WriteLine(
                "[{0}]({1}) {2}",
                DateTime.Now.ToString("hh:mm:ss.fff", CultureInfo.InvariantCulture),
                Thread.CurrentThread.ManagedThreadId,
                message);
        }
    }

    private sealed class EventConsumer : IDisposable
    {
        private readonly CriticalSectionSlim criticalSection;
        private readonly EventProducer producer;
        private readonly TimeSpan processingTime;

        private Task currentTask;

        public EventConsumer(EventProducer producer, TimeSpan processingTime)
        {
            if (producer == null)
            {
                throw new ArgumentNullException("producer");
            }

            if (processingTime < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("processingTime");
            }

            this.processingTime = processingTime;
            this.criticalSection = new CriticalSectionSlim();
            this.producer = producer;
            this.producer.SomethingHappened += this.OnSomethingHappened;
        }

        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.producer.SomethingHappened -= this.OnSomethingHappened;
            }
        }

        private void OnSomethingHappened(object sender, EventArgs e)
        {
            if (this.criticalSection.TryEnter())
            {
                try
                {
                    this.StartTask();
                }
                catch (Exception)
                {
                    this.criticalSection.Exit();
                    throw;
                }
            }
        }

        private void StartTask()
        {
            if (this.currentTask != null)
            {
                this.currentTask.Wait();
            }

            this.currentTask = Task.Factory.StartNew(this.OnSomethingHappenedTask);
        }

        private void OnSomethingHappenedTask()
        {
            try
            {
                this.OnSomethingHappenedImpl();
            }
            finally
            {
                this.criticalSection.Exit();
            }
        }

        private void OnSomethingHappenedImpl()
        {
            ConsoleLogger.WriteLine("BEGIN: Consumer processing.");
            Thread.CurrentThread.Join(this.processingTime);
            ConsoleLogger.WriteLine("END:   Consumer processing.");
        }
    }

    private sealed class EventProducer : IDisposable
    {
        private readonly TimeSpan timeBetweenEvents;
        private readonly Thread thread;
        private volatile bool shouldStop;

        public EventProducer(TimeSpan timeBetweenEvents)
        {
            if (timeBetweenEvents < TimeSpan.Zero)
            {
                throw new ArgumentOutOfRangeException("timeBetweenEvents");
            }

            this.timeBetweenEvents = timeBetweenEvents;
            this.thread = new Thread(this.Run);
            this.thread.Start();
        }

        public event EventHandler SomethingHappened;

        public void Dispose()
        {
            this.Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void Dispose(bool disposing)
        {
            if (disposing)
            {
                this.shouldStop = true;
                this.thread.Join();
            }
        }

        private void Run()
        {
            while (!shouldStop)
            {
                this.RaiseEvent();
                Thread.CurrentThread.Join(this.timeBetweenEvents);
            }
        }

        private void RaiseEvent()
        {
            EventHandler handler = this.SomethingHappened;
            if (handler != null)
            {
                ConsoleLogger.WriteLine("Producer is raising event.");
                handler(this, EventArgs.Empty);
            }
        }
    }

    private sealed class CriticalSectionSlim
    {
        private int active;

        public CriticalSectionSlim()
        {
        }

        public bool TryEnter()
        {
            return Interlocked.CompareExchange(ref this.active, 1, 0) == 0;
        }

        public void Exit()
        {
            Interlocked.Exchange(ref this.active, 0);
        }
    }
}
1 голос
/ 27 декабря 2011

Ну, вы можете сделать что-то вроде этого

, как только вы получите событие, отмените регистрацию события или другими словами прекратите слушать событие больше.

Как только вы закончите с вашим событиемобработка начинается прослушивание события снова путем повторной регистрации на событие

public void TimeChanged(object sender, PropertyChangedEventArgs e)
{

//un register
TimeChanged -= new PropertyChangedEventHandler(PropertyChanged};

// Process lots of calculations

//re-register
TimeChanged += new PropertyChangedEventHandler(PropertyChanged};

}
0 голосов
/ 27 декабря 2011

Я предлагаю поместить все запланированные задачи в очередь, упорядоченную по времени процесса (DateTime). Событие времени (тик) нужно только для проверки того, находится ли задача во главе очереди как «ожидающая выполнения». Это если его время для обработки было достигнуто или прошло. Затем эта задача удаляется из очереди с учетом текущего времени и выполняется.

Задача уведомляет очередь задачи об окончании обратным вызовом, данным в методе execute (который, по-видимому, также занимал текущее время). Очередь задач не будет выполнять никаких других задач во время ее выполнения. Когда задача уведомляет о завершении, очередь задач немедленно проверяет, находится ли задача, если таковая имеется, в начале очереди и т. Д.

Теперь приятное уточнение, которое можно сделать здесь, когда у вас есть упорядоченная по времени очередь задач, состоит в том, что вместо периодических тиков вы устанавливаете таймер на срабатывание (или один раз меняющийся слушатель), когда пришло время выполнить задачу во главе очередь, как вы всегда знаете время до следующего события. Нет необходимости в нескольких слушателях и одном контроллере для выполнения и т. Д.

interface ITask
{
    void Execute(ITaskCallBack callBack, DateTime currentTime);
}

interface ITaskCallBack
{
    void OnCompleted(ITask task); // The task parameter is needed for concurrency
}

Каждый раз, когда задача добавляется или удаляется, обновляется время до следующего события.

Важное замечание: Если вы добавляете новую задачу, которая должна выполняться одновременно с существующей, вы должны добавить ее после всех задач в одно и то же время. Это позволяет избежать дочерних задач, которые могут привести к зависанию планировщика.

Временная очередь - это ваш планировщик задач / контроллер. Вы можете сделать его однопоточным или многопоточным, как вам нравится. Хотя я не вижу особого смысла в многопоточности, если вы не используете несколько процессоров.

interface ITaskScheduler
{
    void Add(ITask task, DateTime executeTime);
    void Remove(ITask);
}

Еще одно приятное замечание: планировщик знает запланированное время и время его начала. Следовательно, у вас есть ценная диагностика для отложенных задач или задержек из-за загрузки. Важно, если ваша система нуждается в детерминированной производительности.

Надеюсь, что это имеет смысл и полезно.

С уважением

...