C # ConcurrentBag - Как безопасно очистить каждый добавленный N объектов - PullRequest
0 голосов
/ 09 января 2019

Я внедряю собственный регистратор, который регистрирует в AWS CloudWatch, и это асинхронный регистратор. Регистратор работает на моей службе WCF, и несколько потоков пытаются войти в систему одновременно. Я создал потокобезопасный одноэлементный доступ для ведения журналов, но AWS CloudWatch SDK не может отправить следующий журнал, пока он не завершит отправку текущего.

Так что моя идея состоит в том, чтобы объединить сообщения журнала в ConcurrentBag, и как только он достигнет нескольких N членов, я отправлю журналы в пакете в CloudWatch (SDK позволяет это, так что все хорошо).

Процесс идет следующим образом: Любой поток вызывает регистратор> Регистратор добавляет сообщение в ConcurrentBag> Поток отправителя продолжает проверять счет (или один и тот же)> Если счет>> N, отправлять пакетный журнал и блокировать другие потоки от добавления во время отправки> повторить

Процесс не должен ждать ответа, чтобы начать добавление снова в ConcurrentBag.

Каков наилучший подход для этого?

1 Ответ

0 голосов
/ 09 января 2019

Моя идея что-то вроде бэкбуфера. Как только одна сумка заполнена, вы заменяете ее в процессе атомара и обрабатываете его. У меня есть наброски такого класса, который мог бы справиться с этим:

public class DualBag<T> : IDisposable
{
    private ConcurrentBag<T> _bag;
    private int _threshold;
    private ManualResetEventSlim _exchangeEvent;
    private CancellationTokenSource _cts;

    public DualBag(int threshold)
    {
        _bag = new ConcurrentBag<T>();
        _threshold = threshold;
        _exchangeEvent = new ManualResetEventSlim();
        _cts = new CancellationTokenSource();
        Task.Run(() => Consume());
    }

    public void Add(T item)
    {
        _bag.Add(item);

        if (_bag.Count > _threshold)
        {
            _exchangeEvent.Set();
        }
    }

    private async Task Consume()
    {
        while (!_cts.IsCancellationRequested)
        {
            _exchangeEvent.Wait();
            _exchangeEvent.Reset();
            var _replacementBag = new ConcurrentBag<T>();
            var bag = Interlocked.Exchange(ref _bag, _replacementBag);
            await ConsumeBag(bag);
        }

        if (!_bag.IsEmpty)
            await ConsumeBag(_bag);
    }

    public void Dispose()
    {
        _cts.Cancel();
    }

    public Task ConsumeBag(ConcurrentBag<T> bag)
    {
        // post entries to the api
    }
}

Это может быть уточнено некоторыми пунктами. Если вы используете библиотеку Стивена Клири asyncEx, вы можете использовать AsyncManualResetEvent, чтобы сделать цикл обработки истинно асинхронным и избавиться от потока с наибольшим временем блокировки.

...