Как мне реализовать «тихий период» при поднятии событий? - PullRequest
5 голосов
/ 24 августа 2010

Я использую шаблон подписчик / уведомитель для получения и получения событий из моего среднего уровня .Net в C #.Некоторые из событий вызываются «пакетами», например, когда данные сохраняются из пакетной программы, импортирующей файл.Это выполняет потенциально долгосрочную задачу, и я хотел бы избежать запуска события несколько раз в секунду, реализуя «тихий период», при котором система событий ожидает, пока поток событий не замедлится для обработки события.

Как мне это сделать, когда издатель принимает активное участие в уведомлении подписчиков?Я не хочу ждать, пока придет событие, чтобы проверить, есть ли другие, ожидающие тихий период ...

В данный момент не существует хост-процесса для опроса модели подписки.Должен ли я отказаться от шаблона публикации / подписки или есть лучший способ?

Ответы [ 3 ]

1 голос
/ 24 августа 2010

Вот грубая реализация, которая может указать вам направление.В моем примере задача, которая включает в себя уведомление, - это сохранение объекта данных.Когда объект сохранен, возникает событие Saved.В дополнение к простому методу Save, я реализовал методы BeginSave и EndSave, а также перегрузку Save, которая работает с этими двумя для пакетного сохранения.Когда вызывается EndSave, запускается одно событие BatchSaved.

Очевидно, вы можете изменить это в соответствии со своими потребностями.В моем примере я отслеживал список всех объектов, которые были сохранены во время пакетной операции, но это, возможно, не то, что вам нужно было бы делать ... вы можете заботиться только о том, сколько объектов было сохранено, или даже просточто операция пакетного сохранения была завершена.Если вы ожидаете сохранения большого количества объектов, то сохранение их в списке, как в моем примере, может стать проблемой с памятью.

РЕДАКТИРОВАТЬ: я добавил концепцию «порога» в мой пример, которая пытается предотвратитьбольшое количество объектов, хранящихся в памяти.Однако это приводит к более частому запуску события BatchSaved.Я также добавил некоторую блокировку для решения проблемы потенциальной безопасности потока, хотя, возможно, я что-то там упустил.

class DataConcierge<T>
{
    // *************************
    // Simple save functionality
    // *************************

    public void Save(T dataObject)
    {
        // perform save logic

        this.OnSaved(dataObject);
    }

    public event DataObjectSaved<T> Saved;

    protected void OnSaved(T dataObject)
    {
        var saved = this.Saved;
        if (saved != null)
            saved(this, new DataObjectEventArgs<T>(dataObject));
    }

    // ************************
    // Batch save functionality
    // ************************

    Dictionary<BatchToken, List<T>> _BatchSavedDataObjects = new Dictionary<BatchToken, List<T>>();
    System.Threading.ReaderWriterLockSlim _BatchSavedDataObjectsLock = new System.Threading.ReaderWriterLockSlim();

    int _SavedObjectThreshold = 17; // if the number of objects being stored for a batch reaches this threshold, then those objects are to be cleared from the list.

    public BatchToken BeginSave()
    {
        // create a batch token to represent this batch
        BatchToken token = new BatchToken();

        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            _BatchSavedDataObjects.Add(token, new List<T>());
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
        return token;
    }

    public void EndSave(BatchToken token)
    {
        List<T> batchSavedDataObjects;
        _BatchSavedDataObjectsLock.EnterWriteLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            this.OnBatchSaved(batchSavedDataObjects); // this causes a single BatchSaved event to be fired

            if (!_BatchSavedDataObjects.Remove(token))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitWriteLock();
        }
    }

    public void Save(BatchToken token, T dataObject)
    {
        List<T> batchSavedDataObjects;
        // the read lock prevents EndSave from executing before this Save method has a chance to finish executing
        _BatchSavedDataObjectsLock.EnterReadLock();
        try
        {
            if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects))
                throw new ArgumentException("The BatchToken is expired or invalid.", "token");

            // perform save logic

            this.OnBatchSaved(batchSavedDataObjects, dataObject);
        }
        finally
        {
            _BatchSavedDataObjectsLock.ExitReadLock();
        }
    }

    public event BatchDataObjectSaved<T> BatchSaved;

    protected void OnBatchSaved(List<T> batchSavedDataObjects)
    {
        lock (batchSavedDataObjects)
        {
            var batchSaved = this.BatchSaved;
            if (batchSaved != null)
                batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects));
        }
    }

    protected void OnBatchSaved(List<T> batchSavedDataObjects, T savedDataObject)
    {
        // add the data object to the list storing the data objects that have been saved for this batch
        lock (batchSavedDataObjects)
        {
            batchSavedDataObjects.Add(savedDataObject);

            // if the threshold has been reached
            if (_SavedObjectThreshold > 0 && batchSavedDataObjects.Count >= _SavedObjectThreshold)
            {
                // then raise the BatchSaved event with the data objects that we currently have
                var batchSaved = this.BatchSaved;
                if (batchSaved != null)
                    batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects.ToArray()));

                // and clear the list to ensure that we are not holding on to the data objects unnecessarily
                batchSavedDataObjects.Clear();
            }
        }
    }
}

class BatchToken
{
    static int _LastId = 0;
    static object _IdLock = new object();

    static int GetNextId()
    {
        lock (_IdLock)
        {
            return ++_LastId;
        }
    }

    public BatchToken()
    {
        this.Id = GetNextId();
    }

    public int Id { get; private set; }
}

class DataObjectEventArgs<T> : EventArgs
{
    public T DataObject { get; private set; }

    public DataObjectEventArgs(T dataObject)
    {
        this.DataObject = dataObject;
    }
}

delegate void DataObjectSaved<T>(object sender, DataObjectEventArgs<T> e);

class BatchDataObjectEventArgs<T> : EventArgs
{
    public IEnumerable<T> DataObjects { get; private set; }

    public BatchDataObjectEventArgs(IEnumerable<T> dataObjects)
    {
        this.DataObjects = dataObjects;
    }
}

delegate void BatchDataObjectSaved<T>(object sender, BatchDataObjectEventArgs<T> e);

В моем примере я решил использовать концепцию токена для создания отдельных пакетов.Это позволяет завершать меньшие пакетные операции, выполняемые в отдельных потоках, и вызывать события, не дожидаясь завершения крупной пакетной операции.

Я сделал отдельные события: Saved и BatchSaved.Однако их так же легко можно объединить в одно событие.

РЕДАКТИРОВАТЬ: фиксированные условия гонки, указанные Стивеном Судитом при доступе к делегатам события.

РЕДАКТИРОВАТЬ: исправленный код блокировки в моем примереиспользовать ReaderWriterLockSlim, а не Monitor (то есть оператор «lock»).Я думаю, что было несколько условий гонки, например, между методами Save и EndSave.Было возможно выполнить EndSave, в результате чего список объектов данных был удален из словаря.Если бы метод Save одновременно выполнялся в другом потоке, можно было бы добавить объект данных в этот список, даже если он уже был удален из словаря.

В моем пересмотренном примере, эта ситуация не может произойти, и метод Save сгенерирует исключение, если он выполнится после EndSave.Эти гоночные условия были вызваны прежде всего тем, что я пытался избежать того, что я считал ненужной блокировкой.Я понял, что в блокировке должно быть больше кода, но решил использовать ReaderWriterLockSlim вместо Monitor, потому что я только хотел предотвратить одновременное выполнение Save и EndSave;не было необходимости препятствовать одновременному выполнению Save несколькими потоками.Обратите внимание, что монитор по-прежнему используется для синхронизации доступа к определенному списку объектов данных, извлеченных из словаря.

РЕДАКТИРОВАТЬ: добавлен пример использования

Ниже приведен пример использования приведенного выше примера кода.

    static void DataConcierge_Saved(object sender, DataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.Saved");
    }

    static void DataConcierge_BatchSaved(object sender, BatchDataObjectEventArgs<Program.Customer> e)
    {
        Console.WriteLine("DataConcierge<Customer>.BatchSaved: {0}", e.DataObjects.Count());
    }

    static void Main(string[] args)
    {
        DataConcierge<Customer> dc = new DataConcierge<Customer>();
        dc.Saved += new DataObjectSaved<Customer>(DataConcierge_Saved);
        dc.BatchSaved += new BatchDataObjectSaved<Customer>(DataConcierge_BatchSaved);

        var token = dc.BeginSave();
        try
        {
            for (int i = 0; i < 100; i++)
            {
                var c = new Customer();
                // ...
                dc.Save(token, c);
            }
        }
        finally
        {
            dc.EndSave(token);
        }
    }

Это привело к следующему выводу:

DataConcierge .BatchSaved: 17

DataConcierge .BatchSaved: 17

DataConcierge .BatchSaved: 17

DataConcierge .BatchSaved: 17

DataConcierge .BatchSaved: 17

DataConcierge .BatchSaved: 15

Порог в моем примере установлен на 17, поэтому пакет из 100 элементов вызывает событие BatchSaved 6 раз.

1 голос
/ 24 августа 2010

Я не уверен, правильно ли я понял ваш вопрос, но я постараюсь исправить проблему у источника - убедитесь, что события не вызываются "пакетами". Вы можете рассмотреть возможность реализации пакетных операций, которые можно использовать из программы импорта файлов. Это будет рассматриваться как одно событие в вашем посреднике и будет вызывать одно событие.

Я думаю, что будет очень сложно реализовать какое-то разумное решение, если вы не сможете внести изменения, описанные выше - вы можете попытаться обернуть своего издателя в «кэширующий» издатель, который бы реализовал некоторую эвристику для кэширования событий, если они идут в очередях. Проще всего было бы кэшировать событие, если в данный момент обрабатывается другое событие того же типа (поэтому ваш пакет вызовет как минимум 2 события - одно в самом начале и одно в конце). Вы можете подождать короткое время и вызвать событие только тогда, когда следующее событие не наступило в течение этого времени, но вы получите временную задержку, даже если в конвейере есть одно событие. Вы также должны быть уверены, что время от времени будете вызывать событие, даже если существует постоянная очередь событий, иначе издатели потенциально будут голодать.

Второй вариант сложен в реализации и будет содержать эвристику, которая может быть очень неправильной ...

0 голосов
/ 24 августа 2010

Вот одна идея, которая только что выпала из моей головы.Я не знаю, насколько это возможно, и не вижу очевидного способа сделать его более общим, но это может быть началом.Все, что он делает, это обеспечивает буфер для событий нажатия кнопки (при необходимости замените его на ваше событие).

class ButtonClickBuffer
{
    public event EventHandler BufferedClick;

    public ButtonClickBuffer(Button button, int queueSize)
    {
        this.queueSize= queueSize;
        button.Click += this.button_Click;
    }

    private int queueSize;
    private List<EventArgs> queuedEvents = new List<EventArgs>();

    private void button_Click(object sender, EventArgs e)
    {
        queuedEvents.Add(e);
        if (queuedEvents.Count >= queueSize)
        {
            if (this.BufferedClick!= null)
            {
                foreach (var args in this.queuedEvents)
                {
                    this.BufferedClick(sender, args);
                }
                queuedEvents.Clear();
            }
        }
    }
}

Поэтому ваш подписчик вместо подписки будет:буфер, указывающий, сколько событий ждать:

ButtonClickBuffer buffer = new ButtonClickBuffer(this.button1, 5);
buffer.BufferedClick += this.button1_Click;

Он работает в простой тестовой форме, которую я поднял, но это далеко не готово к работе!хотите дождаться события, чтобы увидеть, существует ли очередь ожидания, что и делает.Вы можете заменить логику внутри буфера, чтобы создать новый поток, который отслеживает очередь и отправляет события по мере необходимости.Бог знает, какие проблемы с потоками и блокировками могут возникнуть из-за этого!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...