У меня запущена служебная шина Azure Функция , которая прослушивает сообщения, содержащие только строки URL-адресов blob с данными JSON, каждая из которых имеет размер не менее 10 МБ .
Очередь сообщений почти в реальном времени (если я использую правильный термин), поэтому производители продолжают помещать сообщения в очередь с такой частотой, чтобы в очереди всегда были данные для обработки.
I разработали решение, но в большинстве случаев оно получает OutOfMemoryException
. В текущем решении последовательно задействованы следующие шаги:
- Потребление сообщения
- Загрузка файла из URL-адреса в используемом сообщении во временную папку
- Прочтите весь файл в виде строки
- Десериализовать его в объект
- Разделить на части для предоставления Пн go Лимит массового обновления
- Массовое обновление до Пн go
Я попытался решить OutOfMemoryException
, и я подумал, что это потому, что моя функция / потребитель не в таком же темпе с производителем, поэтому я думаю, что в момент t1, когда он получает первое сообщение и обработайте его, а затем, пока оно загружается в mon go, функция продолжает получать сообщения, и они накапливаются в памяти и ожидают загрузки.
Это мои рассуждения правильно?
Таким образом, я думаю, что если бы я мог реализовать решение для потоковой передачи, начиная с №3, чтение из файла путем фрагментации и помещения его в поток, я бы предотвратил сохранение памяти p растет и также сокращает время. У меня в основном Java фон, и я каким-то образом знаю, что с custom iterator/spliterator/iterable
можно выполнять потоковую и асинхронную обработку.
Как я могу выполнять асинхронную потоковую передачу данных с. Net Core в Azure Функция?
Есть ли другие подходы к решению этой проблемы?
namespace x.y.Z
{
public class MyFunction
{
//...
[FunctionName("my-func")]
public async Task Run([ServiceBusTrigger("my-topic", "my-subscription", Connection = "AzureServiceBus")] string message, ILogger log, ExecutionContext context)
{
var data = new PredictionMessage();
try
{
data = myPredictionService.genericDeserialize(message);
await myPredictionService.ValidateAsync(data);
await myPredictionService.AddAsync(data);
}
catch (Exception ex)
{
//...
}
}
}
}
public class PredictionMessage
{
public string BlobURL { get; set; }
}
namespace x.y.z.Prediction
{
public abstract class BasePredictionService<T> : IBasePredictionService<T> where T : PredictionMessage, new()
{
protected readonly ILogger log;
private static JsonSerializer serializer;
public BasePredictionService(ILogger<BasePredictionService<T>> log)
{
this.log = log;
serializer = new JsonSerializer();
}
public async Task ValidateAsync(T message)
{
//...
}
public T genericDeserialize(string message)
{
return JsonConvert.DeserializeObject<T>(message);
}
public virtual Task AddAsync(T message)
{
throw new System.NotImplementedException();
}
public async Task<string> SerializePredictionResult(T message)
{
var result = string.Empty;
using (WebClient client = new WebClient())
{
var tempPath = Path.Combine(Path.GetTempPath(), DateTime.Now.Ticks + ".json");
Uri srcPath = new Uri(message.BlobURL);
await client.DownloadFileTaskAsync(srcPath, tempPath);
using (FileStream fs = File.Open(tempPath, FileMode.Open, FileAccess.Read, FileShare.Read))
{
using (BufferedStream bs = new BufferedStream(fs))
using (StreamReader sr = new StreamReader(bs))
{
result = sr.ReadToEnd();
}
}
Task.Run(() =>
{
File.Delete(tempPath);
});
return result;
}
}
protected TType StreamDataDeserialize<TType>(string streamResult)
{
var body = default(TType);
using (MemoryStream stream = new MemoryStream(Encoding.Default.GetBytes(streamResult)))
{
using (StreamReader streamReader = new StreamReader(stream))
{
body = (TType)serializer.Deserialize(streamReader, typeof(TType));
}
}
return body;
}
protected List<List<TType>> Split<TType>(List<TType> list, int chunkSize = 1000)
{
List<List<TType>> retVal = new List<List<TType>>();
while (list.Count > 0)
{
int count = list.Count > chunkSize ? chunkSize : list.Count;
retVal.Add(list.GetRange(0, count));
list.RemoveRange(0, count);
}
return retVal;
}
}
}
namespace x.y.z.Prediction
{
public class MyPredictionService : BasePredictionService<PredictionMessage>, IMyPredictionService
{
private readonly IMongoDBRepository<MyPrediction> repository;
public MyPredictionService(IMongoDBRepoFactory mongoDBRepoFactory, ILogger<MyPredictionService> log) : base(log)
{
repository = mongoDBRepoFactory.GetRepo<MyPrediction>();
}
public override async Task AddAsync(PredictionMessage message)
{
string streamResult = await base.SerializePredictionResult(message);
var body = base.StreamDataDeserialize<List<MyPrediction>>(streamResult);
if (body != null && body.Count > 0)
{
var chunkList = base.Split(body);
await BulkUpsertProcess(chunkList);
}
}
private async Task BulkUpsertProcess(List<List<MyPrediction>> chunkList)
{
foreach (var perChunk in chunkList)
{
var filterContainers = new List<IDictionary<string, object>>();
var updateContainer = new List<IDictionary<string, object>>();
foreach (var item in perChunk)
{
var filter = new Dictionary<string, object>();
var update = new Dictionary<string, object>();
filter.Add(/*...*/);
filterContainers.Add(filter);
update.Add(/*...*/);
updateContainer.Add(update);
}
await Task.Run(async () =>
{
await repository.BulkUpsertAsync(filterContainers, updateContainer);
});
}
}
}
}