Параллельный производитель / потребитель, использующий ConcurrentQueue / BlockingCollection - PullRequest
1 голос
/ 07 мая 2019

На работе нам нужно асинхронно записывать данные JSON, которые мы получаем от различных конечных точек.Мы писали это простым и понятным способом.Но это оказывается медленно.Итак, мы хотим переключиться на модель «производитель / потребитель».

BlockingCollection, казалось, вполне соответствовал требованиям, поэтому я создал класс, который использовал BlockingCollection вот так

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using RestSharp;

namespace Test
{
    public class JsonRecorder : IJsonRecorder, IDisposable
    {
        private String _jsonFileName { get; set; }
        private DateTime _jsonWriterDate { get; set; } = DateTime.MinValue;
        private readonly JsonSerializerSettings _jsonDateSerializerSettings = new JsonSerializerSettings {DateFormatString = "yyyy-MM-ddTHH:mm:ss.fffZ"};
        private BlockingCollection<string> _itemsToWriteQueue = new BlockingCollection<string>();
        private Boolean _disposed = false;
        private Boolean _ShouldConsumerProcessRun = false;
        private Boolean _isStarted = false;
        private Task _dequeuerTask;
        private object _syncLock = new object();

        public String Name { get; }
        public Exchange Exchange { get; }
        public string FilePath { get;  }
        public ITimeProvider TimeProvider { get; }
        private ISimpleLogService LogService { get; }

        public JsonRecorder(String name, Exchange exchange, [NotNull] ISimpleLogService simpleLogService, String filePath)
            :this(name, exchange, simpleLogService, filePath, new DefaultTimeProvider())
        {
        }

        public JsonRecorder(String name, Exchange exchange, [NotNull] ISimpleLogService simpleLogService, String filePath, [NotNull] ITimeProvider timeProvider)
        {
            Exchange = exchange;
            Name = name;
            LogService = simpleLogService ?? throw new ArgumentNullException(nameof(simpleLogService));
            FilePath = filePath;
            TimeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
        }

        public Boolean InitJsonAuditFile()
        {
            try
            {
                var now = TimeProvider.DateTimeUtcNow;
                if (_jsonWriterDate.Hour == now.Hour)
                    return true;

                if (!String.IsNullOrEmpty(_jsonFileName))
                {
                    ThreadPool.QueueUserWorkItem(_ => { ZipJsonFile(_jsonFileName); });
                    //ZipFileTask.Start();
                }

                _jsonWriterDate = now;
                var directoryName = $"{FilePath}/{_jsonWriterDate:yyyyMMdd}";
                if (!Directory.Exists(directoryName))
                    Directory.CreateDirectory(directoryName);

                _jsonFileName = $@"{directoryName}/{_jsonWriterDate:yyyyMMdd_HHmmss}_{Name}.txt";
                return true;
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
            return false;
        }

        public void ZipJsonFile(String fileName)
        {
            if (String.IsNullOrEmpty(fileName))
            {
                throw new ArgumentNullException(nameof(fileName));
            }
            try
            {
                using (var zip = ZipFile.Open($"{fileName}.zip", ZipArchiveMode.Create))
                {
                    zip.CreateEntryFromFile(fileName, Path.GetFileName(fileName));
                }
                File.Delete(fileName);
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
        }

        public void JsonRecord(IRestClient client, Dictionary<String, String> body)
        {
            try
            {
                var record = new
                {
                    date = TimeProvider.DateTimeUtcNow,
                    url = client.BaseUrl,
                    body = body?.Select(parameter => new
                    {
                        name = parameter.Key,
                        value = parameter.Value,
                    })
                };
                _itemsToWriteQueue.Add(JsonConvert.SerializeObject(record, _jsonDateSerializerSettings));
            }
            catch (Exception)
            {
                // ignored
            }
        }



        public void JsonRecord(String stringifiedResponse)
        {
            try
            {
                _itemsToWriteQueue.Add(stringifiedResponse);
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
        }


        public void Stop()
        {
            lock (_syncLock)
            {
                _itemsToWriteQueue.CompleteAdding();
                _ShouldConsumerProcessRun = false;
                _dequeuerTask?.Wait(TimeSpan.FromSeconds(5));
            }
        }

        public bool Start()
        {
            lock (_syncLock)
            {
                if (!_isStarted)
                {
                    _isStarted = true;
                    _dequeuerTask = Task.Run(() =>
                    {
                        Thread.CurrentThread.Name = "JsonDequeuerTask";
                        RunConsumerProcess();
                    });
                }
                return true;
            }
        }

        /// <inheritdoc />
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void RunConsumerProcess()
        {
            _ShouldConsumerProcessRun = true;
            while (_ShouldConsumerProcessRun && !_itemsToWriteQueue.IsCompleted)
            {
                InitJsonAuditFile();

                string itemToWriteToFile = null;
                try
                {
                    itemToWriteToFile = _itemsToWriteQueue.Take();
                }
                catch (InvalidOperationException) { }

                if (itemToWriteToFile != null)
                {
                    using (var stream = File.Open(_jsonFileName, FileMode.Append, FileAccess.Write))
                    {
                        using (var sw = new StreamWriter(stream))
                        {
                            sw.WriteLine(itemToWriteToFile);
                        }
                    }
                }
            }
        }

        private void Dispose(bool disposing)
        {
            if (_disposed)
                return;

            if (disposing)
            {
                Stop();
            }

            _disposed = true;
        }
    }
}

Однако когда язапустите этот код на реальной виртуальной машине, мы увидим память, если достигнет 2G.Я видел это: .Net Concurrent BlockingCollection имеет утечку памяти? Что должно быть исправлено в .NET 4.5 (у нас работает .NET 4.7.2), и я также видел сообщение тоже ConcurrentQueue, держась за несколько устаревших элементов

По-прежнему виден огромный отпечаток памяти.

Итак, мы поменялись местами с помощью этого

public class BlockingQueueSlim<T>
{
    private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
    private readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    private static readonly TimeSpan MinWait = TimeSpan.FromMilliseconds(1);


    public void Add(T item)
    {
        _queue.Enqueue(item);
        _autoResetEvent.Set();
    }

    public bool TryPeek(out T result)
    {
        return _queue.TryPeek(out result);
    }

    public T Take()
    {
        T item;
        while (!_queue.TryDequeue(out item))
            _autoResetEvent.WaitOne();
        return item;
    }

    public bool TryTake(out T item, TimeSpan patience)
    {
        if (_queue.TryDequeue(out item))
            return true;
        var stopwatch = Stopwatch.StartNew();
        while (stopwatch.Elapsed < patience)
        {
            if (_queue.TryDequeue(out item))
                return true;
            var patienceLeft = (patience - stopwatch.Elapsed);
            if (patienceLeft <= TimeSpan.Zero)
                break;
            else if (patienceLeft < MinWait)
                // otherwise the while loop will degenerate into a busy loop,
                // for the last millisecond before patience runs out
                patienceLeft = MinWait;
            _autoResetEvent.WaitOne(patienceLeft);
        }

        return false;
    }

    public int CurrentItemCount => _queue.Count;

}

Где я используюПримерно так:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.IO.Compression;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using RestSharp;

namespace Test
{
    public class JsonRecorder : IJsonRecorder, IDisposable
    {
        private String _jsonFileName { get; set; }
        private DateTime _jsonWriterDate { get; set; } = DateTime.MinValue;
        private readonly JsonSerializerSettings _jsonDateSerializerSettings = new JsonSerializerSettings {DateFormatString = "yyyy-MM-ddTHH:mm:ss.fffZ"};
        private BlockingQueueSlim<string> _itemsToWriteQueue = new BlockingQueueSlim<string>();
        private Boolean _disposed = false;
        private Boolean _ShouldConsumerProcessRun = false;
        private Boolean _isStarted = false;
        private Task _dequeuerTask;
        private object _syncLock = new object();
        private long _seqId = 0;

        public String Name { get; }
        public Exchange Exchange { get; }
        public string FilePath { get;  }
        public ITimeProvider TimeProvider { get; }
        private ISimpleLogService LogService { get; }

        public JsonRecorder(String name, Exchange exchange, [NotNull] ISimpleLogService simpleLogService, String filePath)
            :this(name, exchange, simpleLogService, filePath, new DefaultTimeProvider())
        {
        }

        public JsonRecorder(String name, Exchange exchange, [NotNull] ISimpleLogService simpleLogService, String filePath, [NotNull] ITimeProvider timeProvider)
        {
            Exchange = exchange;
            Name = name;
            LogService = simpleLogService ?? throw new ArgumentNullException(nameof(simpleLogService));
            FilePath = filePath;
            TimeProvider = timeProvider ?? throw new ArgumentNullException(nameof(timeProvider));
        }

        public Boolean InitJsonAuditFile()
        {
            try
            {
                var now = TimeProvider.DateTimeUtcNow;
                if (_jsonWriterDate.Hour == now.Hour)
                    return true;

                if (!String.IsNullOrEmpty(_jsonFileName))
                {
                    ThreadPool.QueueUserWorkItem(_ => { ZipJsonFile(_jsonFileName); });
                    //ZipFileTask.Start();
                }

                _jsonWriterDate = now;
                var directoryName = $"{FilePath}/{_jsonWriterDate:yyyyMMdd}";
                if (!Directory.Exists(directoryName))
                    Directory.CreateDirectory(directoryName);

                _jsonFileName = $@"{directoryName}/{_jsonWriterDate:yyyyMMdd_HHmmss}_{Name}.txt";
                return true;
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
            return false;
        }

        public void ZipJsonFile(String fileName)
        {
            if (String.IsNullOrEmpty(fileName))
            {
                throw new ArgumentNullException(nameof(fileName));
            }
            try
            {
                using (var zip = ZipFile.Open($"{fileName}.zip", ZipArchiveMode.Create))
                {
                    zip.CreateEntryFromFile(fileName, Path.GetFileName(fileName));
                }
                File.Delete(fileName);
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
        }


        public void JsonRecord(IRestClient client, Dictionary<String, String> body)
        {
            try
            {
                var record = new
                {
                    seqId = Interlocked.Increment(ref _seqId),
                    date = TimeProvider.DateTimeUtcNow,
                    url = client.BaseUrl,
                    body = body?.Select(parameter => new
                    {
                        name = parameter.Key,
                        value = parameter.Value,
                    })
                };
                _itemsToWriteQueue.Add(JsonConvert.SerializeObject(record, _jsonDateSerializerSettings));
            }
            catch (Exception)
            {
                // ignored
            }
        }

        public void JsonRecord(String stringifiedResponse)
        {
            try
            {
                _itemsToWriteQueue.Add(stringifiedResponse);
            }
            catch (Exception ex)
            {
                LogService.LogException(this, LogCategory.GW, Exchange, ex);
            }
        }

        public void Stop()
        {
            lock (_syncLock)
            {
                _isStarted = false;
                _ShouldConsumerProcessRun = false;
                _dequeuerTask?.Wait(TimeSpan.FromSeconds(5));
            }
        }

        public bool Start()
        {
            lock (_syncLock)
            {
                if (!_isStarted)
                {
                    _isStarted = true;
                    _dequeuerTask = Task.Run(() =>
                    {
                        Thread.CurrentThread.Name = "JsonDequeuerTask";
                        RunConsumerProcess();
                    });
                }
                return true;
            }
        }

        /// <inheritdoc />
        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        private void RunConsumerProcess()
        {
            _ShouldConsumerProcessRun = true;
            while (_ShouldConsumerProcessRun)
            {
                InitJsonAuditFile();

                string itemToWriteToFile = null;
                try
                {
                    itemToWriteToFile = _itemsToWriteQueue.Take();
                }
                catch (InvalidOperationException) { }

                if (itemToWriteToFile != null)
                {
                    using (var stream = File.Open(_jsonFileName, FileMode.Append, FileAccess.Write))
                    {
                        using (var sw = new StreamWriter(stream))
                        {
                            sw.WriteLine(itemToWriteToFile);
                        }
                    }
                }
            }
        }

        private void Dispose(bool disposing)
        {
            if (_disposed)
                return;

            if (disposing)
            {
                Stop();
            }

            _disposed = true;
        }
    }
}

Однако, в конечном итоге, это потребляет много памяти до 2G.

Я также читал различные посты о том, что в ConcurrentQueue есть утечки памяти.Например, здесь

Я сейчас немного потерян.мне нужно

  • Я могу создавать значения из разных источников (разных потоков)
  • Потребитель может работать в выделенном потоке
  • Я НЕ МОГУ потерять данные
  • Я в порядке, если используется кольцевой буфер (при условии, что я не потеряю данные).Потребитель довольно быстр, поэтому этого не должно быть
  • Требования к памяти регулируются с помощью некоторого параметра размера

Что бы люди посоветовали в качестве жизнеспособного подхода для достижения этого набора требований, так какклассы .NET, похоже, не работают для нас

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...