Consumer Producer - поток производителя никогда не выполняет назначенную функцию - PullRequest
0 голосов
/ 21 ноября 2018

У меня есть решение .NET Core Web API.В каждом вызове мне нужно выполнить некоторые операции с базой данных.Проблема в том, что несколько соединений БД открываются и закрываются.Поэтому, чтобы избежать этого, я хочу реализовать Очередь объектов для отправки в базу данных, а затем хочу, чтобы отдельный поток выполнял операцию БД.Я попробовал некоторый код, как показано ниже.Но здесь Поток потребителя никогда не выполняет назначенную функцию.Для Producer отдельного потока нет, я просто наполняю очередь объектом.Какие модификации я должен сделать?Нужны некоторые рекомендации, так как я новичок в Threading.

  public static class BlockingQueue
{
    public static Queue<WebServiceLogModel> queue;
    static BlockingQueue()
    {
        queue = new Queue<WebServiceLogModel>();

    }

    public static object Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            return queue.Dequeue();
        }
    }
    public static void Enqueue(WebServiceLogModel webServiceLog)
    {
        lock (queue)
        {
            queue.Enqueue(webServiceLog);
            Monitor.Pulse(queue);
        }
    }

    public static void ConsumerThread(IConfiguration configuration)
    {
        WebServiceLogModel webServiceLog = (WebServiceLogModel)Dequeue();
        webServiceLog.SaveWebServiceLog(configuration);
    }

   public static void ProducerThread(WebServiceLogModel webServiceLog)
    {
        Enqueue(webServiceLog);
         Thread.Sleep(100);
    }
}

Я создал и запустил тему в StartUp.cs:

    public Startup(IConfiguration configuration)
    {
        Thread t = new Thread(() => BlockingQueue.ConsumerThread(configuration));
        t.Start();
    }

В Controller я написал код для подачи в очередь:

    [HttpGet]
    [Route("abc")]
    public IActionResult GetData()
    {
        BlockingQueue.ProducerThread(logModel);
        return StatusCode(HttpContext.Response.StatusCode = (int)HttpStatusCode.NotFound, ApplicationConstants.Message.NoBatchHistoryInfo);
    }

Ответы [ 2 ]

0 голосов
/ 21 ноября 2018

Прежде всего, старайтесь избегать static классов и методов.Используйте шаблон singleton в этом случае (и если вам это действительно нужно).Во-вторых, старайтесь избегать lock, Monitor - эти примитивы параллелизма значительно снижают вашу производительность.В такой ситуации вы можете использовать BlockingCollection <> в качестве 'Adam G', упомянутого выше, или вы можете разработать собственное решение.

public class Service : IDisposable
{
    private readonly BlockingCollection<WebServiceLogModel> _packets =
        new BlockingCollection<WebServiceLogModel>();
    private Task _task;
    private volatile bool _active;
    private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(1);

    public Service()
    {
        _active = true;
        _task = ExecTaskInternal();
    }

    public void Enqueue(WebServiceLogModel model)
    {
        _packets.Add(model);
    }

    public void Dispose()
    {
        _active = false;
    }

    private async Task ExecTaskInternal()
    {
        while (_active)
        {
            if (_packets.TryTake(out WebServiceLogModel model))
            {
                // TODO: whatever you need
            }
            else
            {
                await Task.Delay(WaitTimeout);
            }
        }
    }
}

public class MyController : Controller
{
    [HttpGet]
    [Route("abc")]
    public IActionResult GetData([FromServices] Service service)
    {
        // receive model form somewhere
        WebServiceLogModel model = FetchModel();
        // enqueue model
        service.Enqueue(model);
        // TODO: return what you need
    }
}

И при запуске:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<Service>();
        // TODO: other init staffs
    }
}

Вы дажеможете добавить методы Start / Stop к службе вместо реализации IDisposable и запустить службу в классе запуска в методе Configure(IApplicationBuilder app).

0 голосов
/ 21 ноября 2018

Я думаю, что ваш потребительский поток выполняется только один раз, если в очереди что-то есть, а затем сразу же возвращается.Если вы хотите, чтобы поток выполнял работу в фоновом режиме, который запускается только один раз, он никогда не должен возвращаться и должен перехватывать все исключения.Ваш поток из BlockingQueue.ConsumerThread вызывается один раз в Stratup и возвращается.

Также имейте в виду, что выполнение такого решения небезопасно.ASP.NET не гарантирует, что фоновые потоки будут работать, если не поступают запросы. Ваш пул приложений может перезапускаться (и по умолчанию он перезапускается через 20 минут бездействия или каждые 27 часов), поэтому есть вероятность, что ваш фонкод не будет выполнен для некоторых элементов очереди.

Кроме того, хотя он не решает все проблемы, я бы предложил использовать https://www.hangfire.io/ для выполнения фоновых задач на сервере ASP.NET.Он имеет постоянный уровень, может повторять задания и имеет простые API.В своем обработчике запросов вы можете отправлять новые задания в Hangfire, а затем иметь только 1 поток обработчика заданий.

...