Сохранить сущности в пакете в базу данных - PullRequest
0 голосов
/ 03 мая 2020

Я создаю веб-API, который хранит сообщения журнала в базе данных. Требования указывают, что мой API должен иметь возможность хранить 900 сообщений в секунду параллельно. Я создал следующий простой метод репозитория для вставки сущностей:

public async Task<int> InsertListAsync(IEnumerable<LogMessage> logMessages)
{
    await _context.LogMessages.AddRangeAsync(logMessages);
    return await _context.SaveChangesAsync();
}

Это код моего контроллера:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        var logMessages = logMessageModels.Select(x => LogMessageDTOMapper.Map(x));
        await _repo.InsertListAsync(logMessages);
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000,e,"Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

Этот тест отлично работает для относительно небольшого количества клиентов, отправляющих запросы на в то же время (максимум 90). Однако я написал следующий интеграционный тест (функция GetStringContentForPostRequest в основном создает количество случайно сгенерированных сообщений LogMessages, в данном случае 1):

[Fact]
public void Test_MultipleRequests()
{
    // Arrange
    var client = _factory.CreateClient();
    //Act
    var responses = new List<Task<HttpResponseMessage>>();
    for (int i = 0; i < 900; i++)
    {

        responses.Add(client.PostAsync("/logs", GetStringContentForPostRequest(1)));
    }
    Task.WaitAll(responses.ToArray());
    // Assert
    foreach (var item in responses)
    {
        item.Result.EnsureSuccessStatusCode(); // Status Code 200-299
    }
}

Этот тест отправляет сразу 900 запросов, каждый из которых содержит один LogMessage. Однако этот тест не пройден с сообщением об ошибке:

System.InvalidOperationException: An exception has been raised that is likely due to a transient failure.
 ---> Npgsql.PostgresException (0x80004005): 53300: sorry, too many clients already

Одним из решений может быть увеличение количества разрешенных подключений на моем сервере базы данных Postgres, но есть простой способ сначала собрать все полученные сообщения журнала и сохранить их в базе данных одновременно?

1 Ответ

0 голосов
/ 07 мая 2020

Я решил это, используя ConcurrentQueue в качестве буфера для моих сообщений LogMessages. Вместо того, чтобы напрямую сохранять их в базе данных на моем контроллере, я добавил их в abuffer, который внедряется как одиночный в мой API.

public class Buffer: IBuffer
{
    private ConcurrentQueue<LogMessageDTO> _logMessageDTOs;
    public Buffer()
    {
        _logMessageDTOs = new ConcurrentQueue<LogMessageDTO>();
    }
    public ConcurrentQueue<LogMessageDTO> LogMessageDTOs()
    {
        return _logMessageDTOs;
    }

}

Таким образом, мое действие контроллера теперь выглядит так:

public async Task<IActionResult> LogMessage([FromBody] IEnumerable<LogMessageDTO> logMessageModels)
{
    try
    {
        await Task.Run(() =>
        {
            foreach (var item in logMessageModels)
            {
                _buffer.LogMessageDTOs().Enqueue(item);
            }
        });
        return new OkResult();
    }
    catch (Exception e)
    {
        _logger.LogError(1000, e, "Error while processing LogMessages");
        return BadRequest($"Error while processing LogMessages:{e.Message}{Environment.NewLine}{e.StackTrace}");
    }
}

Что обеспечивает более быстрое время отклика для потребителя, вызывающего этот API.

Затем этот ConcurrentQueue очищается службой TimedHostService, о которой я читал подробнее на этой странице документации Microsoft . По сути, эта служба синхронизированного хоста очищает ConcurrentQueue и сохраняет элементы в БД. Это метод DoWork моего TimedHostService:

private async void DoWork(object state)
{
    _logger.LogInformation("Timed background service is working");
    var logMessageDTOs = _buffer.LogMessageDTOs();
    using (var scope = _services.CreateScope())
    {
        var repository =
            scope.ServiceProvider
                .GetRequiredService<ILogMessageRepository>();
        var mappedLogMessages = new List<LogMessage>();
        while (logMessageDTOs.TryDequeue(out LogMessageDTO logMessageDTO))
        {
            mappedLogMessages.Add(LogMessageDTOMapper.Map(logMessageDTO,_logMessagePrefix));
        }
        await repository.InsertListAsync(mappedLogMessages);
    }
    _logger.LogInformation("Timed background service has stopped working");
}

Это приводит к гораздо меньшему количеству соединений, необходимых для БД, так как не каждое действие контроллера создает новое соединение с базой данных.

...