Обмен данными между классами осуществляется с помощью методов класса. Аналогично, обмен сообщениями между Thread
s осуществляется с помощью методов класса.
Если все локально, ie вам не нужно отправлять что-либо по сети, тогда вы можно попробовать следующий код:
import java.util.LinkedList;
import java.util.Objects;
import java.util.Queue;
import java.util.Random;
public class Main {
public static class MessageSource {
}
public static class Message<S extends MessageSource> {
private final S origin;
public Message(final S origin) {
this.origin = Objects.requireNonNull(origin);
}
public S getOrigin() {
return origin;
}
}
public static class ClientRequest extends Message<Client> {
public ClientRequest(final Client origin) {
super(origin);
}
}
public static class TellerResponse extends Message<Teller> {
public TellerResponse(final Teller origin) {
super(origin);
}
}
public static class Teller extends MessageSource implements Runnable {
private final Queue<ClientRequest> sharedQueue;
public Teller(final Queue<ClientRequest> sharedQueue) {
this.sharedQueue = Objects.requireNonNull(sharedQueue);
}
@Override
public void run() {
try {
final Random rand = new Random();
while (true) {
final ClientRequest r;
synchronized (sharedQueue) {
while (sharedQueue.isEmpty()) {
System.out.println("Teller " + hashCode() + " says queue is empty.");
sharedQueue.wait();
}
r = sharedQueue.poll();
}
System.out.println("Teller " + hashCode() + " currently seving request from Client " + r.getOrigin().hashCode() + "...");
Thread.sleep(250 + rand.nextInt(250)); //Delay a bit, to simulate serving the request and creating the response...
r.getOrigin().response(new TellerResponse(this)); //One could say that this simulates an RPC call :P
}
}
catch (final InterruptedException ix) {
System.err.println("Teller " + hashCode() + " stopped serving clients abruptly: " + ix);
}
}
}
public static class Client extends MessageSource implements Runnable {
private final Queue<ClientRequest> sharedQueue;
private TellerResponse privateQueue; //Since responses will be received here, I call this a queue (although it's not, because we know we can serve only one response at a time).
public Client(final Queue<ClientRequest> sharedQueue) {
this.sharedQueue = Objects.requireNonNull(sharedQueue);
}
public synchronized void response(final TellerResponse r) {
privateQueue = r;
notifyAll(); //Could be notify(). No difference would it make in this specific case.
}
@Override
public void run() {
//I'm just implementing random count of random-data requests...
final Random rand = new Random();
final int numberOfRequests = 5 + rand.nextInt(6);
try {
for (int i = 0; i < numberOfRequests; ++i) {
final ClientRequest req = new ClientRequest(this);
synchronized (sharedQueue) {
sharedQueue.add(req);
sharedQueue.notifyAll(); //Could be notify(). No difference would it make in this specific case.
}
synchronized (this) {
while (privateQueue == null)
wait();
System.out.println("Client " + hashCode() + " can consume the " + privateQueue.getOrigin().hashCode() + " Teller's response...");
privateQueue = null;
}
}
}
catch (final InterruptedException ix) {
System.err.println("Client " + hashCode() + " stopped receiving responses abruptly: " + ix);
}
}
}
public static void main(final String[] args) {
final Queue<ClientRequest> requests = new LinkedList<>();
for (int i = 0; i < 100; ++i)
new Thread(new Client(requests)).start();
for (int i = 0; i < 3; ++i)
new Thread(new Teller(requests)).start();
}
}
На один ClientRequest
отвечает Teller
с одним TellerResponse
. Теперь вам нужно расширить классы ClientRequest
и TellerResponse
в соответствии с вашими потребностями (ie реализует, какие данные должны быть обменены).
Эта реализация представляет собой модель производитель-потребитель, в которой и кассиры, и Клиентами являются как производители, так и потребители. Эта реализация использует общую очередь для обмена сообщениями от клиентов к кассирам.
Нет никаких условий остановки для кассиров (что заставляет их ждать неопределенно в конце запросов клиентов), что заставит программу ждать вечно в конце запросов клиентов. Но это ожидаемо, поскольку у нас нет условия завершения.
Клиенты будут запускать от 5 до 10 запросов. Счетчики задержат каждый ответ от 250 до 500 мс. С 3 кассирами и 100 клиентами это дает нам время работы от 42 до 167 секунд.
Более реальный c подход к коммуникации, который, я думаю, будет использовать PipedInputStream
s и PipedOutputStream
s для симуляции трафика c через блокирующие потоки (например, сетевой трафик c).