Связь между двумя потоками, созданными основным потоком в Java - PullRequest
1 голос
/ 27 марта 2020

Итак, мне нужно создать три потока Teller и 100 потоков клиента.

Каждый поток должен следовать последовательности действий, печатая запись каждого действия.

потоки Teller должны: уведомить клиента о том, что он доступен, принять идентификатор и транзакцию от клиента, ответить клиенту, и т. д.

у клиента аналогичный набор действий: ожидание в очереди, когда сигнализируется выбрать свободного кассира, et c

Чего я не понимаю, как мне заставить эти темы общаться?

Итак, я делаю три потока Teller из основного метода, 100 потоков клиента, как именно я могу подключить поток клиента к потоку кассира? Очевидно, что когда три потока кассира открываются, они могут принять только 3 клиента, поэтому 97 других клиентских потоков будут ожидать. Как остановить метод run () клиента, чтобы потоки все еще оставались живыми в ожидании открытого теллера?

Ответы [ 2 ]

0 голосов
/ 04 апреля 2020

Обмен данными между классами осуществляется с помощью методов класса. Аналогично, обмен сообщениями между Thread s осуществляется с помощью методов класса.

Если все локально, ie вам не нужно отправлять что-либо по сети, тогда вы можно попробовать следующий код:

import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;

public class Main {

    public static class MessageSource {
    }

    public static class Message<S extends MessageSource> {
        private final S origin;

        public Message(final S origin) {
            this.origin = Objects.requireNonNull(origin);
        }

        public S getOrigin() {
            return origin;
        }
    }

    public static class ClientRequest extends Message<Client> {
        public ClientRequest(final Client origin) {
            super(origin);
        }
    }

    public static class TellerResponse extends Message<Teller> {
        public TellerResponse(final Teller origin) {
            super(origin);
        }
    }

    public static class Teller extends MessageSource implements Runnable {
        private final Queue<ClientRequest> sharedQueue;

        public Teller(final Queue<ClientRequest> sharedQueue) {
            this.sharedQueue = Objects.requireNonNull(sharedQueue);
        }

        @Override
        public void run() {
            try {
                final Random rand = new Random();
                while (true) {
                    final ClientRequest r;
                    synchronized (sharedQueue) {
                        while (sharedQueue.isEmpty()) {
                            System.out.println("Teller " + hashCode() + " says queue is empty.");
                            sharedQueue.wait();
                        }
                        r = sharedQueue.poll();
                    }
                    System.out.println("Teller " + hashCode() + " currently seving request from Client " + r.getOrigin().hashCode() + "...");
                    Thread.sleep(250 + rand.nextInt(250)); //Delay a bit, to simulate serving the request and creating the response...
                    r.getOrigin().response(new TellerResponse(this)); //One could say that this simulates an RPC call :P
                }
            }
            catch (final InterruptedException ix) {
                System.err.println("Teller " + hashCode() + " stopped serving clients abruptly: " + ix);
            }
        }
    }

    public static class Client extends MessageSource implements Runnable {
        private final Queue<ClientRequest> sharedQueue;
        private TellerResponse privateQueue; //Since responses will be received here, I call this a queue (although it's not, because we know we can serve only one response at a time).

        public Client(final Queue<ClientRequest> sharedQueue) {
            this.sharedQueue = Objects.requireNonNull(sharedQueue);
        }

        public synchronized void response(final TellerResponse r) {
            privateQueue = r;
            notifyAll(); //Could be notify(). No difference would it make in this specific case.
        }

        @Override
        public void run() {
            //I'm just implementing random count of random-data requests...
            final Random rand = new Random();
            final int numberOfRequests = 5 + rand.nextInt(6);
            try {
                for (int i = 0; i < numberOfRequests; ++i) {
                    final ClientRequest req = new ClientRequest(this);
                    synchronized (sharedQueue) {
                        sharedQueue.add(req);
                        sharedQueue.notifyAll(); //Could be notify(). No difference would it make in this specific case.
                    }
                    synchronized (this) {
                        while (privateQueue == null)
                            wait();
                        System.out.println("Client " + hashCode() + " can consume the " + privateQueue.getOrigin().hashCode() + " Teller's response...");
                        privateQueue = null;
                    }
                }
            }
            catch (final InterruptedException ix) {
                System.err.println("Client " + hashCode() + " stopped receiving responses abruptly: " + ix);
            }
        }
    }

    public static void main(final String[] args) {
        final Queue<ClientRequest> requests = new LinkedList<>();

        for (int i = 0; i < 100; ++i)
            new Thread(new Client(requests)).start();

        for (int i = 0; i < 3; ++i)
            new Thread(new Teller(requests)).start();
    }
}

На один ClientRequest отвечает Teller с одним TellerResponse. Теперь вам нужно расширить классы ClientRequest и TellerResponse в соответствии с вашими потребностями (ie реализует, какие данные должны быть обменены).

Эта реализация представляет собой модель производитель-потребитель, в которой и кассиры, и Клиентами являются как производители, так и потребители. Эта реализация использует общую очередь для обмена сообщениями от клиентов к кассирам.

Нет никаких условий остановки для кассиров (что заставляет их ждать неопределенно в конце запросов клиентов), что заставит программу ждать вечно в конце запросов клиентов. Но это ожидаемо, поскольку у нас нет условия завершения.

Клиенты будут запускать от 5 до 10 запросов. Счетчики задержат каждый ответ от 250 до 500 мс. С 3 кассирами и 100 клиентами это дает нам время работы от 42 до 167 секунд.

Более реальный c подход к коммуникации, который, я думаю, будет использовать PipedInputStream s и PipedOutputStream s для симуляции трафика c через блокирующие потоки (например, сетевой трафик c).

0 голосов
/ 27 марта 2020

Вы можете использовать Обратный отсчет .

По сути это работает так: Сигнал обратного отсчета помогает, заставляя клиента ждать, если все кассиры заняты в данный момент, и уведомляет клиентский поток если количество сигналов достигает 0 (что означает, что настала моя очередь).

Я предлагаю создать сигнал защелки обратного отсчета с количеством, равным 1, для каждого клиента и сохранить их в структуре данных (я рекомендую очередь) Таким образом, каждый раз, когда кассир работает с клиентом, просто извлекает сигнал защелки обратного отсчета из очереди, затем уменьшает счетчик снятого сигнала до 0, чтобы клиент с этим сигналом получал уведомление.

Я рекомендую очередь, потому что после каждый всплеск удаляет элемент, поэтому нет никаких гонок данных (поток читает один и тот же элемент в одно и то же время, поэтому клиент обслуживается двумя потоками).

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...