C # Monitor / Semaphore Concurrency Produce-Consumer для буфера - PullRequest
0 голосов
/ 15 февраля 2012

Я работаю над решением проблемы с типичной проблемой производителя-потребителя.У меня есть несколько производителей и один потребитель.Существует n потоков производителей, каждый из которых вызывает SetOne (OrderObject order) , а потребитель вызывает GetOne () .Класс Buffer используется в качестве буфера, который содержит Buffer .По какой-то причине нижеприведенная настройка работает иногда, но не всегда использует все ячейки.Я включил все участвующие классы Клиент, Сервер и Буфер .Кроме того, я могу показать простой прототип, на котором запущен этот прототип.К сведению - используемый метод заключается в том, чтобы сначала инициализировать количество семафоров до того же размера, что и используемый буфер, а затем, когда буфер открывается, найти открытую ячейку и затем выполнить операцию с этой ячейкой.

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }
    public static void SetOne(OrderObject order)
    {

        _pool.WaitOne();
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }

    public static OrderObject GetOne()
    {
        _pool.WaitOne();
        OrderObject value = null;
        try
        {
            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }


public class Client
{
    public int Id {get;set;}

    public void Run()
    {
         /*Produce*/
         Buffer.SetOne(Id);

    }
}

public class Server
{
    public void Run()
    {
        while(true)
        {
             Buffer.GetOne();
        }
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        /*Initialize 2 Open Semaphores*/
        int numCells = 2;
        Semaphore pool = new Semaphore(numCells, numCells);

        /*Initialize BufferCells with Empty OrderObjects*/
        List<OrderObject> OrderObjects = new List<OrderObject>();
        for (var i = 0; i < numCells; i++)
        {
            OrderObjects.Add(new OrderObject() { Id = "-1" });
        }
        Buffer.BufferCells = OrderObjects.ToArray();

        /*Initialize Consumer Thread*/
        Server server = new Server(pool);
        Thread serverThread = new Thread(new ThreadStart(server.Run));


        /*Initialize Producer Objects*/
        List<Client> clients = new List<Client>();
        for (int i = 0; i <= 20; i++)
        {
            /*Create 5000 Clients*/
            Client client = new Client(i.ToString(), pool, new OrderObject() { Id = i.ToString() });
            clients.Add(client);
        }

        /*Start Each Client Thread*/
        List<Thread> clientThreads = new List<Thread>();
        foreach (var client in clients)
        {
            Thread t = new Thread(new ThreadStart(client.Run));
            clientThreads.Add(t);
        }

        /*Start Server Thread*/
        serverThread.Start();

        /*Start Each Producer Thread*/
        clientThreads.ForEach(p => p.Start());

        /*Start Consumer Thread*/
        Console.ReadLine();
    }
}

Я предполагаю, что столкнулся с одной из следующих проблем: тупик, живая блокировка или голодание.По какой-то причине Сервер не может использовать все объекты заказа, созданные и добавленные в буфер ячеек.Не уверен, что это за исправление.

Ответы [ 2 ]

1 голос
/ 15 февраля 2012

Хорошо, давайте разберем, что пытается сделать этот код ..

public class Buffer
{
    public static OrderObject[] BufferCells;
    public static Semaphore _pool { get; set; }

// Набор Один

    public static void SetOne(OrderObject order)
    {

// Зачем вам здесь семафор?

        _pool.WaitOne();
        try
        {

// Семафор, является избыточным, поскольку Monitor.Enter является более строгой блокировкой.

            Monitor.Enter(BufferCells);

// А? Я думал, что это должно быть SetOne? Не установлено все? Я могу только предположить, что вы намеревались установить одну из ячеек, в то время как остальные были доступны для настройки или получения. Очередь кажется более подходящей структурой данных, если это то, чего вы пытаетесь достичь. Еще лучше BlockingCollection, который также имеет включенную механику блокировки / блокировки.

            for (int i = 0; i < BufferCells.Length - 1; i++)
            {

                BufferCells[i] = order;
                Console.WriteLine(String.Format("Client {0} Produced {1}", BufferCells[i].Id, BufferCells[i].Id));
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        } 
    }

    public static OrderObject GetOne()
    {

// Опять же, этот семафор здесь не очень полезен

        _pool.WaitOne();
        OrderObject value = null;
        try
        {

// Потому что опять монитор имеет более ограничительную блокировку

            Monitor.Enter(BufferCells);
            for (int i = 0; i < BufferCells.Length - 1; i++)
            {
                if (BufferCells[i].Id != "-1")
                {
                    value = BufferCells[i];
                    BufferCells[i] = new OrderObject() { Id = "-1" }; /*Clear Cell*/
                    Console.WriteLine(String.Format("        Server Consumed {0}", value.Id));
                    break;
                }
            }
        }
        finally
        {
            Monitor.Exit(BufferCells);
            _pool.Release();
        }
        return value;
    }
 }

Подводя итог:

  • В этом коде используется избыточная блокировка, которая не нужна
  • Этот код устанавливает все ячейки в буфере одним махом под эксклюзивной блокировкой, которая, кажется, в первую очередь нарушает назначение буфера
  • Похоже, что этот код пытается достичь, уже реализован в BlockingCollection. Не нужно изобретать велосипед! :)

Удачи!

0 голосов
/ 15 февраля 2012

Я собираюсь предположить, что монитор должен защищать доступ к массиву, а семафор должен выполнять подсчет, да?

Если это так, вам не следует вызывать '_pool.Release ()'в разделе finally getOne (), и вы не должны вызывать _pool.WaitOne () в верхней части setOne ().Работа производителей заключается в том, чтобы сигнализировать семафор после того, как он нажал на объект, а работа потребителей - дождаться семафора, прежде чем нажать на объект.

Aarrgghh!

'Используемый метод:чтобы сначала инициализировать количество семафоров с тем же размером используемого буфера, '

Если вы хотите неограниченную очередь, инициализируйте семафор до 0.

Если вы хотите ограниченную очередь,как намекает ваш текст выше, вам нужно два семафора (и монитора) - один «I» для подсчета элементов в очереди, один «S» для подсчета оставшихся в очереди пробелов,Инициализируйте I к 0 и S к размеру очереди.В производителе дождитесь S, заблокируйте, нажмите, разблокируйте, сигнал I. У потребителя дождитесь I, заблокируйте, нажмите, разблокируйте, сигнал S.

...