Как реализовать асинхронную потоковую передачу данных в. Net Сработала основная служебная шина Azure Функция обработки огромных данных, чтобы не получить OutOfMemoryException? - PullRequest
0 голосов
/ 05 августа 2020

У меня запущена служебная шина Azure Функция , которая прослушивает сообщения, содержащие только строки URL-адресов blob с данными JSON, каждая из которых имеет размер не менее 10 МБ .

Очередь сообщений почти в реальном времени (если я использую правильный термин), поэтому производители продолжают помещать сообщения в очередь с такой частотой, чтобы в очереди всегда были данные для обработки.

I разработали решение, но в большинстве случаев оно получает OutOfMemoryException. В текущем решении последовательно задействованы следующие шаги:

  1. Потребление сообщения
  2. Загрузка файла из URL-адреса в используемом сообщении во временную папку
  3. Прочтите весь файл в виде строки
  4. Десериализовать его в объект
  5. Разделить на части для предоставления Пн go Лимит массового обновления
  6. Массовое обновление до Пн go

Я попытался решить OutOfMemoryException, и я подумал, что это потому, что моя функция / потребитель не в таком же темпе с производителем, поэтому я думаю, что в момент t1, когда он получает первое сообщение и обработайте его, а затем, пока оно загружается в mon go, функция продолжает получать сообщения, и они накапливаются в памяти и ожидают загрузки.

Это мои рассуждения правильно?

Таким образом, я думаю, что если бы я мог реализовать решение для потоковой передачи, начиная с №3, чтение из файла путем фрагментации и помещения его в поток, я бы предотвратил сохранение памяти p растет и также сокращает время. У меня в основном Java фон, и я каким-то образом знаю, что с custom iterator/spliterator/iterable можно выполнять потоковую и асинхронную обработку.

Как я могу выполнять асинхронную потоковую передачу данных с. Net Core в Azure Функция?

Есть ли другие подходы к решению этой проблемы?

namespace x.y.Z
{
    public class MyFunction
    {
        //...

        [FunctionName("my-func")]
        public async Task Run([ServiceBusTrigger("my-topic", "my-subscription", Connection = "AzureServiceBus")] string message, ILogger log, ExecutionContext context)
        {
            var data = new PredictionMessage();

            try
            {
                data = myPredictionService.genericDeserialize(message);
                await myPredictionService.ValidateAsync(data);
                await myPredictionService.AddAsync(data);
            }
            catch (Exception ex)
            {
                //...
            }
        }
    }
}

public class PredictionMessage
    {
        public string BlobURL { get; set; }
    }

namespace x.y.z.Prediction
{
    public abstract class BasePredictionService<T> : IBasePredictionService<T> where T : PredictionMessage, new()
    {
        protected readonly ILogger log;
        private static JsonSerializer serializer;


        public BasePredictionService(ILogger<BasePredictionService<T>> log)
        {
            this.log = log;
            serializer = new JsonSerializer();
        }
        public async Task ValidateAsync(T message)
        {
            //...
        }

        public T genericDeserialize(string message)
        {
            return JsonConvert.DeserializeObject<T>(message);
        }

        public virtual Task AddAsync(T message)
        {
            throw new System.NotImplementedException();
        }

        public async Task<string> SerializePredictionResult(T message)
        {
            var result = string.Empty;

            using (WebClient client = new WebClient())
            {
                var tempPath = Path.Combine(Path.GetTempPath(), DateTime.Now.Ticks + ".json");
                Uri srcPath = new Uri(message.BlobURL);

                await client.DownloadFileTaskAsync(srcPath, tempPath);

                using (FileStream fs = File.Open(tempPath, FileMode.Open, FileAccess.Read, FileShare.Read))
                {
                    using (BufferedStream bs = new BufferedStream(fs))
                    using (StreamReader sr = new StreamReader(bs))
                    {
                        result = sr.ReadToEnd();
                    }
                }

                Task.Run(() =>
                {
                    File.Delete(tempPath);
                });

                return result;
            }
        }

        protected TType StreamDataDeserialize<TType>(string streamResult)
        {
            var body = default(TType);

            using (MemoryStream stream = new MemoryStream(Encoding.Default.GetBytes(streamResult)))
            {
                using (StreamReader streamReader = new StreamReader(stream))
                {
                    body = (TType)serializer.Deserialize(streamReader, typeof(TType));
                }
            }

            return body;
        }


        protected List<List<TType>> Split<TType>(List<TType> list, int chunkSize = 1000)
        {
            List<List<TType>> retVal = new List<List<TType>>();

            while (list.Count > 0)
            {
                int count = list.Count > chunkSize ? chunkSize : list.Count;
                retVal.Add(list.GetRange(0, count));
                list.RemoveRange(0, count);
            }

            return retVal;
        }
    }
}

namespace x.y.z.Prediction
{
    public class MyPredictionService : BasePredictionService<PredictionMessage>, IMyPredictionService
    {
        private readonly IMongoDBRepository<MyPrediction> repository;

        public MyPredictionService(IMongoDBRepoFactory mongoDBRepoFactory, ILogger<MyPredictionService> log) : base(log)
        {
            repository = mongoDBRepoFactory.GetRepo<MyPrediction>();
        }

        public override async Task AddAsync(PredictionMessage message)
        {
            string streamResult = await base.SerializePredictionResult(message);

            var body = base.StreamDataDeserialize<List<MyPrediction>>(streamResult);

            if (body != null && body.Count > 0)
            {
                var chunkList = base.Split(body);

                await BulkUpsertProcess(chunkList);
            }
        }

        private async Task BulkUpsertProcess(List<List<MyPrediction>> chunkList)
        {
            foreach (var perChunk in chunkList)
            {
                var filterContainers = new List<IDictionary<string, object>>();
                var updateContainer = new List<IDictionary<string, object>>();

                foreach (var item in perChunk)
                {
                    var filter = new Dictionary<string, object>();
                    var update = new Dictionary<string, object>();

                    filter.Add(/*...*/);
                    filterContainers.Add(filter);

                    update.Add(/*...*/);
                    updateContainer.Add(update);
                }

                await Task.Run(async () =>
                {
                    await repository.BulkUpsertAsync(filterContainers, updateContainer);
                });
            }
        }
    }
}

   
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...