Я пытаюсь написать многопоточную серверную программу с 2 серверами. Я тестирую с 2 клиентами. Иногда в зависимости от хэш-кода md5 клиент, подключенный к серверу, должен отключиться от указанного сервера и подключиться к другому. Иногда это происходит без проблем, и иногда я получаю java. net .SocketException: сокет закрыт. Вот код
Класс брокера (сервер):
import java.io.IOException;
import java.math.BigInteger;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.*;
import java.util.ArrayList;
import java.util.List;
public class Broker extends Node implements Runnable {
private static List<Publisher> registeredpublishers = new ArrayList<Publisher>();
private static List<Consumer> registeredConsumers = new ArrayList<Consumer>();
public static List<Consumer> GetConsumers(){
return registeredConsumers;
}
public String Name;
public Integer port;
public Broker(Integer port,String name){
this.port=port;
this.Name=name;
}
ServerSocket providerSocket;
Socket connection = null;
String ip="127.0.0.1";
BigInteger myKeys;
public void run(){
calculateKeys();
Node.getBrokers().add(this);
openServer();
}
void calculateKeys(){
String g =ip+ (port != null ? port.toString() : null);
MessageDigest m = null;
try {
m = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
m.reset();
m.update(g.getBytes());
byte[] digest = m.digest();
myKeys = new BigInteger(1,digest);
BigInteger a=new BigInteger("25");
myKeys=myKeys.mod(a);
System.out.println(myKeys);
}
void openServer()throws NullPointerException {
try {
providerSocket = new ServerSocket(this.port, 10);
while (true) {
acceptConnection();
new BrokerHandler(connection,this).start();
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
providerSocket.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
void acceptConnection()throws NullPointerException {
try {
connection = providerSocket.accept();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("client connected.");
{
}
}
public static void main(String args[]) {
new Thread(new Broker(54319,"First")).start();
new Thread(new Broker(12320,"Second")).start();
}
}
Класс BrokerHandler
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.Socket;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
public class BrokerHandler extends Thread implements Serializable {
ObjectInputStream in;
ObjectOutputStream out;
String f;
BigInteger theirKeys;
Broker broker;
Object e;
Message request;
public BrokerHandler(Socket connection,Broker broker) throws NullPointerException{
try {
in = new ObjectInputStream(connection.getInputStream());
out =new ObjectOutputStream(connection.getOutputStream());
try {
this.request=(Message)in.readObject();
this.f=request.artist;
this.e =request.entity;
this.broker=broker;
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void run(){
if(this.e instanceof Consumer){
calculateMessageKeys(this.request);
checkBroker(this.broker,(Consumer) e);
}
}
public synchronized void disconnect(Socket connection){
try {
in.close();
out.close();
} catch (IOException ex) {
ex.printStackTrace();
}finally {
try {
connection.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public void checkBroker(Broker broker,Consumer consumer) {
int intMyKeys = broker.myKeys.intValue();
int intTheirKeys = theirKeys.intValue();
if (intTheirKeys > 23) {
intTheirKeys = intTheirKeys % 23;
}
if (intTheirKeys <= intMyKeys && intTheirKeys >= intMyKeys - 11) {
consumer.Register(broker, f);
System.out.println(broker.Name + "Client Connected and Registered");
} else {
int thePort = 0;
System.out.println(broker.Name + "Client changing server");
for (Broker broker1 : Node.getBrokers()) {
int KEYS = broker1.myKeys.intValue();
if (intTheirKeys <= KEYS && intTheirKeys >= KEYS - 11) {
thePort = broker1.port;
System.out.println(thePort);
}
}
disconnect(broker.connection);
Consumer a = new Consumer(consumer.artist, thePort);
new ConsumerHandler(a).start();
}
}
public void calculateMessageKeys(Message request) {
MessageDigest m = null;
try {
m = MessageDigest.getInstance("MD5");
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
m.reset();
m.update(f.getBytes());
byte[] digest = m.digest();
theirKeys = new BigInteger(1,digest);
BigInteger mod=new BigInteger("25");
theirKeys=theirKeys.mod(mod);
System.out.println(theirKeys);
}
}
Класс потребителя
import java.io.*;
import java.util.Random;
public class Consumer extends Node implements Serializable {
String artist;
Random r=new Random();
int max=12320;
int min=54319;
int port;
Message request;
public Consumer(String artist){
this.artist=artist;
this.port=new Random().nextBoolean() ? max : min;
request= new Message(artist,this.getConsumer());
}
public Consumer(String artist,int port){
this.artist=artist;
this.port=port;
request= new Message(artist,this.getConsumer());
}
public Consumer getConsumer(){
return this;
}
public void Register(Broker broker,String ArtistName){
broker.GetConsumers().add(this);
System.out.println(this.artist);
}
}
Класс ConsumerHandle
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
public class ConsumerHandler extends Thread{
Consumer consumer;
public ConsumerHandler(Consumer consumer){
this.consumer=consumer;
}
void connect(int port) {
Socket requestSocket=null;
ObjectOutputStream out = null;
ObjectInputStream in = null;
try {
requestSocket = new Socket("127.0.0.1", consumer.port);
out = new ObjectOutputStream(requestSocket.getOutputStream());
in = new ObjectInputStream(requestSocket.getInputStream());
System.out.println("Message created.");
out.writeObject(consumer.request);
} catch (UnknownHostException unknownHost) {
System.err.println("You are trying to connect to an unknown host!");
} catch (IOException ioException) {
ioException.printStackTrace();
} finally {
try {
in.close();
out.close();
requestSocket.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
public void run(){
connect(consumer.port);
}
public static void main(String args[]) {
Consumer a=new Consumer("Kevin MacLeod");
Consumer b=new Consumer("Alexander Narakada");
new ConsumerHandler(a).start();
new ConsumerHandler(b).start();
}
}
и трассировка стека, когда это происходит:
java.net.SocketException: Socket closed
at java.base/sun.nio.ch.NioSocketImpl.ensureOpenAndConnected(NioSocketImpl.java:166)
at java.base/sun.nio.ch.NioSocketImpl.beginRead(NioSocketImpl.java:232)
at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:300)
at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:351)
at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:802)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:937)
at java.base/java.net.Socket$SocketInputStream.read(Socket.java:932)
at java.base/java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2778)
at java.base/java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:3105)
at java.base/java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:3115)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1597)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2304)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2142)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at BrokerHandler.<init>(BrokerHandler.java:26)
at Broker.openServer(Broker.java:59)
at Broker.run(Broker.java:34)
at java.base/java.lang.Thread.run(Thread.java:830)