Попытка реализовать синхронную обратную реакцию очереди - PullRequest
0 голосов
/ 03 декабря 2018

Я пытаюсь создать собственную синхронную очередь с использованием мониторов.Обычно <Я бы использовал уже реализованные очереди, но я новичок в многопоточности и хотел бы посмотреть, что у вас под капотом.Моя логика оборачивает Linked-List внутри класса вдоль максимальной емкости и текущей емкости, плюс флаг, который сигнализирует, что на другом конце нет записывающего устройства (аналогично каналу C). </p>

Как это работает:

  • PUSH : пока текущая емкость максимальная, мы ждем.Иначе, мы добавляем подарок в связанный список и увеличиваем емкость, а затем уведомляем.
  • tryPOP : пока текущая емкость равна 0, подождите.В противном случае сохраните подарок во временной переменной, верните его, затем следует блок finally, который печатает (использует toString () объекта), чтобы распечатать его.Если время ожидания истекло, он вернет ноль, и вызывающий объект должен попытаться снова.(Я сделал это, чтобы предотвратить взаимные блокировки)

Что кажется неправильным: всякий раз, когда я добавляю что-то в очередь, я также печатаю то, что было добавлено, и текущую емкость.Я вижу, что емкость превышает максимальную емкость , и я не могу понять, почему, поскольку все методы синхронизированы и

Теперь для некоторого кода:

public class GiftQueue {
    private final Queue<Gift> factoryGiftQueue;
    private final int queueCapacity;
    private volatile int currentCapacity;
    private volatile boolean writingEndClosed;


    public GiftQueue(Queue<Gift> factoryGiftQueue, int queueCapacity) {
        this.factoryGiftQueue = factoryGiftQueue;
        this.queueCapacity = queueCapacity;
        this.currentCapacity = 0;
        this.writingEndClosed = false;

    }

    public synchronized void push(Gift gift) {
        while (currentCapacity == queueCapacity) {
            try {
                System.out.println("Queue full, awaiting pop...");
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        factoryGiftQueue.add(gift);
        currentCapacity++;
        System.out.println("Gift " + gift + " has been added to the queue. Queue size: " + currentCapacity);
        notifyAll();
    }


    public synchronized Gift tryPop(int timeoutNanos) {
        while (currentCapacity == 0) {
            try {
                System.out.println("Queue is empty, awaiting new push...");
                wait();
                if (currentCapacity == 0) {
                    System.out.println("Pop ABORTED");
                    return null;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }


        }
        Gift temp = factoryGiftQueue.poll();
        --currentCapacity;

        notifyAll();

        try {
            return temp;
        } finally {
            System.out.println("Gift has" + temp + " been pulled from queue. Queue size " + currentCapacity);
        }


    }

    public synchronized void close() {
        System.out.println("Writing end has been closed !");
        writingEndClosed = true;
    }


    public synchronized boolean isAvailable() {
        return (!(writingEndClosed) || currentCapacity != 0);
    }

    public synchronized boolean notEmpty() {
        return currentCapacity != 0;
    }

    public synchronized boolean isFull() {
        return currentCapacity == queueCapacity;
    }

    public synchronized boolean isEmpty() {
        return currentCapacity == 0;
    }
}

Объектами вызова являются:

public class Reindeer implements Runnable {
    private final int ID;
    private final GiftQueue factoryQueue;
    private final GiftQueue santaGiftQueue;
    private final Semaphore serviceQueue;

    public Reindeer(int ID, GiftQueue factoryQueue, GiftQueue santaGiftQueue,
                    Semaphore serviceQueue) {
        this.ID = ID;
        this.factoryQueue = factoryQueue;
        this.santaGiftQueue = santaGiftQueue;
        this.serviceQueue = serviceQueue;
    }

    @Override
    public void run() {
        while (factoryQueue.isAvailable()) {
            Gift gift = null;
            System.out.println("Reindeer " + ID + " working");
            try {
                serviceQueue.acquire();
                if (factoryQueue.isEmpty()) {
                    serviceQueue.release();
                    continue;
                }
                gift = factoryQueue.tryPop(10);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally{
                serviceQueue.release();
            }
            if (gift != null) {
                santaGiftQueue.push(gift);
                System.out.println("Reindeer " + ID + " sent a gift");
            } else {
                System.out.println("Reindeer " + ID + " will try again ");
                awaitRandom();
            }

        }

        System.out.println("Reindeer " + ID + " finished");
    }

    private void awaitRandom() {
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public String toString() {
        return ID + "";
    }

}

и

public class Factory implements Runnable {
    private static final PositionBuilder positionBuilder = new PositionBuilder();
    private final int maxGifts;
    private final int factoryID;
    private final int matrixSize;
    private final int numberOfElves;
    private final GiftQueue giftQueue;
    private final Thread[] elvesThreads;
    private final ArrayList<ElfRunnable> elves = new ArrayList<>();
    private static final ElfBuilder elfBuilder = new ElfBuilder();
    private final FactoryElfMap elfMap;
    private final Semaphore reindeerSemaphore;
    private final WorkshopStatus workshopStatus;
    private final Semaphore serviceQueue;
    private final Orchestrator orchestrator;
    private int producedGifts;


    public Factory(int factoryID, int numberOfToysTobeProduced, int matrixSize, int numberOfElves,
                   GiftQueue giftQueue, Orchestrator orchestrator,
                   FactoryElfMap elfMap,
                   Semaphore reindeerSemaphore,
                   WorkshopStatus workshopStatus,
                   Semaphore serviceQueue) {
        this.factoryID = factoryID;
        this.maxGifts = numberOfToysTobeProduced;
        this.matrixSize = matrixSize;
        this.numberOfElves = numberOfElves;
        this.giftQueue = giftQueue;
        this.orchestrator = orchestrator;
        this.elfMap = elfMap;
        this.reindeerSemaphore = reindeerSemaphore;
        this.workshopStatus = workshopStatus;
        this.serviceQueue = serviceQueue;
        this.producedGifts = 0;
        this.elvesThreads = new Thread[numberOfElves];
        System.out.println(numberOfElves);
    }

    @Override
    public void run() {
        createElves();
        startElves();

        while (goalNotAchieved()) {
            orchestrator.awaitSupervisorTurn();
            System.out.println("Factory " + factoryID + " is working..");
            queryPositions();
            orchestrator.setWorkersTurn();


        }
        joinElves();

        System.out.println("Factory " + factoryID + " FINISHED");


        workshopStatus.signalDone();
        giftQueue.close();
    }

    public synchronized boolean goalNotAchieved() {
        return !(producedGifts == maxGifts);
    }


    private void joinElves() {
        for (int i = 0; i < numberOfElves; i++) {
            try {
                elvesThreads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private void startElves() {
        for (int i = 0; i < numberOfElves; i++) {
            elvesThreads[i].start();
        }
    }

    private void createElves() {
        for (int i = 0; i < numberOfElves; i++) {
            IPosition elfPosition = positionBuilder.create(matrixSize, elfMap);
            ElfRunnable elfRunnable = elfBuilder.create(i, this, elfPosition,
                                                        orchestrator);
            elfMap.addEntry(elfRunnable, elfPosition);
            elfPosition.setOwner(elfRunnable);
            elvesThreads[i] = new Thread(elfRunnable);
        }
    }

    private synchronized void queryPositions() {
        try {
            reindeerSemaphore.acquire();
            System.out.println("Factory " + factoryID + " starting query....");
            for (ElfRunnable elf : elves) {
                System.out.println("   Queried " + elf + ": (" + elf.getElfPosition() + ")");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reindeerSemaphore.release();
        }

    }


    public synchronized void becomeAware(ElfRunnable elf) {
        System.out.println("Factory " + factoryID + " got new elf");
        elves.add(elf);
    }

    public synchronized void notifyGiftCreated(Gift gift) {
        try {
            reindeerSemaphore.acquire();
            if (producedGifts == maxGifts) {
                orchestrator.setWorkersTurn();
                System.out.println("Rejecting " + gift);
            } else {
                serviceQueue.acquire();
                producedGifts++;

                if (giftQueue.isFull()) {
                    serviceQueue.release();
                    reindeerSemaphore.release();
                    notifyGiftCreated(gift);
                    return;
                }
                giftQueue.push(gift);
                System.out.println("GIFTS: " + producedGifts);
                serviceQueue.release();

            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            reindeerSemaphore.release();
        }




    }


    @Override
    public String toString() {
        return factoryID + "";
    }


}

Пример вывода:

Gift Gift-1329879510-34 has been added to the queue. Queue size: 1
GIFTS: 400
Reindeer 0 working
Gift hasGift-1329879510-34 been pulled from queue. Queue size 0
released
Gift Gift-1329879510-34 has been added to the queue. Queue size: 400 // <--- ???
Reindeer 1 sent a gift
Reindeer 1 working
released
Reindeer 0 working
Gift Gift-1015004123-1 has been added to the queue. Queue size: 1
GIFTS: 401
Gift hasGift-1015004123-1 been pulled from queue. Queue size 0
released
Queue full, awaiting pop...
released
Reindeer 0 working
Gift Gift--1853705096-37 has been added to the queue. Queue size: 1
GIFTS: 402
Gift hasGift--1853705096-37 been pulled from queue. Queue size 0
released
Queue full, awaiting pop...
Gift Gift-899203208-33 has been added to the queue. Queue size: 1
GIFTS: 403
Gift Gift--948848140-41 has been added to the queue. Queue size: 2
GIFTS: 404
Gift Gift--389884399-19 has been added to the queue. Queue size: 3
GIFTS: 405
Gift Gift-615746031-32 has been added to the queue. Queue size: 4
GIFTS: 406
Gift Gift-1784101203-3 has been added to the queue. Queue size: 5
GIFTS: 407
Gift Gift--1790412473-6 has been added to the queue. Queue size: 6
GIFTS: 408
...