Почему я иногда получаю java. net .SocketException: сокет закрыт на многопоточном сервере - PullRequest
0 голосов
/ 23 марта 2020

Я пытаюсь написать многопоточную серверную программу с 2 серверами. Я тестирую с 2 клиентами. Иногда в зависимости от хэш-кода md5 клиент, подключенный к серверу, должен отключиться от указанного сервера и подключиться к другому. Иногда это происходит без проблем, и иногда я получаю java. net .SocketException: сокет закрыт. Вот код

Класс брокера (сервер):

    import java.io.IOException;
import java.math.BigInteger;
import java.net.ServerSocket;
import java.net.Socket;
import java.security.*;

import java.util.ArrayList;
import java.util.List;

public class Broker extends Node implements Runnable {
    private static List<Publisher> registeredpublishers = new ArrayList<Publisher>();
    private static List<Consumer> registeredConsumers = new ArrayList<Consumer>();
    public static List<Consumer> GetConsumers(){
        return registeredConsumers;
    }
    public String Name;
    public Integer port;
    public Broker(Integer port,String name){
        this.port=port;
        this.Name=name;
    }

    ServerSocket providerSocket;
    Socket connection = null;
    String ip="127.0.0.1";
    BigInteger myKeys;

    public void run(){
        calculateKeys();
        Node.getBrokers().add(this);

        openServer();

    }
    void calculateKeys(){
        String g =ip+ (port != null ? port.toString() : null);
        MessageDigest m = null;
        try {
            m = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }
        m.reset();
        m.update(g.getBytes());
        byte[] digest = m.digest();
        myKeys = new BigInteger(1,digest);
        BigInteger a=new BigInteger("25");
        myKeys=myKeys.mod(a);
        System.out.println(myKeys);

    }
    void openServer()throws NullPointerException {
        try {
            providerSocket = new ServerSocket(this.port, 10);
            while (true) {
                acceptConnection();
                new BrokerHandler(connection,this).start();

            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                providerSocket.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }

    void acceptConnection()throws NullPointerException {
        try {
            connection = providerSocket.accept();

        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("client connected.");
        {


        }
    }
    public static void main(String args[]) {

        new Thread(new Broker(54319,"First")).start();
        new Thread(new Broker(12320,"Second")).start();


    }
    }

Класс BrokerHandler

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.math.BigInteger;
import java.net.Socket;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class BrokerHandler extends Thread implements Serializable {
    ObjectInputStream in;
    ObjectOutputStream out;
    String f;
    BigInteger theirKeys;
    Broker broker;
    Object e;
    Message request;



    public BrokerHandler(Socket connection,Broker broker) throws NullPointerException{
        try {
            in = new ObjectInputStream(connection.getInputStream());
            out =new ObjectOutputStream(connection.getOutputStream());
            try {
                this.request=(Message)in.readObject();
                this.f=request.artist;
               this.e =request.entity;
                this.broker=broker;

            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public void run(){
        if(this.e instanceof Consumer){
            calculateMessageKeys(this.request);
            checkBroker(this.broker,(Consumer) e);
        }

    }
    public synchronized void disconnect(Socket connection){
        try {
            in.close();
            out.close();
        } catch (IOException ex) {
            ex.printStackTrace();
        }finally {
            try {
                connection.close();
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public void checkBroker(Broker broker,Consumer consumer) {


        int intMyKeys = broker.myKeys.intValue();
        int intTheirKeys = theirKeys.intValue();
        if (intTheirKeys > 23) {
            intTheirKeys = intTheirKeys % 23;
        }
        if (intTheirKeys <= intMyKeys && intTheirKeys >= intMyKeys - 11) {
            consumer.Register(broker, f);
            System.out.println(broker.Name + "Client Connected and Registered");
        } else {
            int thePort = 0;
            System.out.println(broker.Name + "Client changing server");
            for (Broker broker1 : Node.getBrokers()) {
                int KEYS = broker1.myKeys.intValue();
                if (intTheirKeys <= KEYS && intTheirKeys >= KEYS - 11) {
                    thePort = broker1.port;
                    System.out.println(thePort);
                }
            }
            disconnect(broker.connection);
            Consumer a = new Consumer(consumer.artist, thePort);
            new ConsumerHandler(a).start();
        }
    }
    public  void calculateMessageKeys(Message request)  {
        MessageDigest m = null;
        try {
            m = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            e.printStackTrace();
        }

        m.reset();
        m.update(f.getBytes());
        byte[] digest = m.digest();
        theirKeys = new BigInteger(1,digest);
        BigInteger mod=new BigInteger("25");
        theirKeys=theirKeys.mod(mod);
        System.out.println(theirKeys);
    }
}

Класс потребителя

import java.io.*;
import java.util.Random;
public class Consumer extends Node implements Serializable  {

          String artist;
    Random r=new Random();
    int max=12320;
    int min=54319;
    int port;
    Message request;


    public Consumer(String artist){
              this.artist=artist;
              this.port=new Random().nextBoolean() ? max : min;
              request= new Message(artist,this.getConsumer());
          }
     public Consumer(String artist,int port){
        this.artist=artist;
        this.port=port;
         request= new Message(artist,this.getConsumer());
     }
          public Consumer getConsumer(){
        return this;
    }

         public void Register(Broker broker,String ArtistName){
             broker.GetConsumers().add(this);
             System.out.println(this.artist);

         }

}

Класс ConsumerHandle

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.net.Socket;
import java.net.UnknownHostException;

public class ConsumerHandler extends Thread{


    Consumer consumer;
    public ConsumerHandler(Consumer consumer){
        this.consumer=consumer;
    }
    void connect(int port) {
        Socket requestSocket=null;
        ObjectOutputStream out = null;
        ObjectInputStream in = null;
        try {

            requestSocket = new Socket("127.0.0.1", consumer.port);
            out = new ObjectOutputStream(requestSocket.getOutputStream());
            in = new ObjectInputStream(requestSocket.getInputStream());
            System.out.println("Message created.");
            out.writeObject(consumer.request);
        } catch (UnknownHostException unknownHost) {
            System.err.println("You are trying to connect to an unknown host!");
        } catch (IOException ioException) {
            ioException.printStackTrace();
        } finally {
            try {
                in.close();
                out.close();
                requestSocket.close();
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }
    }
    public void run(){
        connect(consumer.port);
    }
    public static void main(String args[]) {
        Consumer a=new Consumer("Kevin MacLeod");
        Consumer b=new Consumer("Alexander Narakada");

        new ConsumerHandler(a).start();
        new ConsumerHandler(b).start();

    }
}

и трассировка стека, когда это происходит:

java.net.SocketException: Socket closed
    at java.base/sun.nio.ch.NioSocketImpl.ensureOpenAndConnected(NioSocketImpl.java:166)
    at java.base/sun.nio.ch.NioSocketImpl.beginRead(NioSocketImpl.java:232)
    at java.base/sun.nio.ch.NioSocketImpl.implRead(NioSocketImpl.java:300)
    at java.base/sun.nio.ch.NioSocketImpl.read(NioSocketImpl.java:351)
    at java.base/sun.nio.ch.NioSocketImpl$1.read(NioSocketImpl.java:802)
    at java.base/java.net.Socket$SocketInputStream.read(Socket.java:937)
    at java.base/java.net.Socket$SocketInputStream.read(Socket.java:932)
    at java.base/java.io.ObjectInputStream$PeekInputStream.peek(ObjectInputStream.java:2778)
    at java.base/java.io.ObjectInputStream$BlockDataInputStream.peek(ObjectInputStream.java:3105)
    at java.base/java.io.ObjectInputStream$BlockDataInputStream.peekByte(ObjectInputStream.java:3115)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1597)
    at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2410)
    at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2304)
    at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2142)
    at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
    at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at BrokerHandler.<init>(BrokerHandler.java:26)
    at Broker.openServer(Broker.java:59)
    at Broker.run(Broker.java:34)
    at java.base/java.lang.Thread.run(Thread.java:830)
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...