Настройка клиентского сокета и прослушивателя ServerSocket (Java) - PullRequest
0 голосов
/ 11 мая 2018

Я пытаюсь установить одноранговое соединение в Java.

Я пытаюсь настроить свою программу на прослушивание входящего соединения, в то время как внешне я могу подключиться к другому клиенту.

Как создать экземпляр моего сокетного соединения: socketConnection как все, что связано с программой.В идеале так:

if(socketConnection.isConnectedToExternalPeer()){
//do stuff
} else if (socketConnection.hasAnIncomingConnection()){
//do stuff
}

После ознакомления с решением @ L.Spillner, я собрал следующий код ниже, эта единственная проблема заключается в том, что я не совсем понимаю, как мне принять соединениеЭто видно из того факта, что когда я пытаюсь настроить потоки, программа завершает цикл в ожидании ответа партнера:

public class Client implements AutoCloseable {

    // Any other ThreadPool can be used as well
    private ExecutorService cachedExecutor = null;
    private ExecutorService singleThreadExecutor = null;
    // port this client shall listen on
    private int port = 0;

    // Name of the client
    private String name = null;

    // indicates that a connection is ongoing
    private boolean isConnected = false;

    // the socket the Client is currently connected with
    private Socket activeConenctionSocket = null;

    // The ServerSocket which will be listening for any incoming connection
    private ServerSocket listener = null;

    // The socket which has been accepted by the ServerSocket
    private Future<Socket> acceptedSocket;


    private ObjectInputStream inputStream = null;


    private ObjectOutputStream outputStream = null;

    private BloomChain bloomChain = null;


    /**
     * @param port Port number by which this client shall be accessed.
     * @param name The name of this Client.
     */
    public Client( int port, String name )
    {
        this.port = port;
        this.name = name;
        this.bloomChain = new BloomChain();

        this.cachedExecutor = Executors.newCachedThreadPool();
        this.singleThreadExecutor = Executors.newSingleThreadExecutor();

        this.listener = createListeningSocket();
        startListening();
    }

    private ServerSocket createListeningSocket()
    {

        ServerSocket temp = null;
        try
        {
            temp = new ServerSocket( this.port );

        }
        catch ( IOException e )
        {
            e.printStackTrace();
        }

        return temp;
    }

    private void startListening()
    {
        if ( !this.isConnected )
        {
            this.listener = createListeningSocket();
            this.acceptedSocket = this.cachedExecutor.submit( new ServAccept( this.listener ) );

        }
    }



    /**
     * Attempts to connect to any other socket specified by the hostname and the targetport.
     *
     * @param host The hostname of the target to connect.
     * @param targetport The port of the target.
     */
    public void connect( String host, int targetport )
    {

        try
        {   System.out.println(host);
            System.out.println(targetport);
            this.activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );

            setUpStreams(this.activeConenctionSocket);

            this.isConnected = true;

            System.out.println(InetAddress.getAllByName(host));
        }
        catch ( IOException e )
        {
            e.printStackTrace();
        }

        try
        {
            this.listener.close();
        }
        catch ( IOException e )
        {
            // this will almost certainly throw an exception but it is intended.
        }
    }

    public void setUpStreams(Socket socket) throws IOException {
        this.outputStream = new ObjectOutputStream(socket.getOutputStream());
        this.outputStream.flush();
        this.inputStream = new ObjectInputStream(socket.getInputStream());
    }

    @Override
    public void close() throws Exception
    {
        // close logic (can be rather nasty)
    }

    public void sendMessage(String message){

        if(bloomChain.size()<1){
            bloomChain.addBlock(new Block(message, "0"));
        } else {
            bloomChain.addBlock(new Block(message, bloomChain.get(bloomChain.size()-1).getPreviousHash()));
        }
        try {

            this.outputStream.writeObject(bloomChain);
            this.outputStream.flush();

        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    public String mineMessage(){
        final String[] receivedMessage = {null};
        final Block tempBlock = this.bloomChain.get(this.bloomChain.size()-1);

        this.singleThreadExecutor.submit(()->{
            tempBlock.mineBlock(bloomChain.getDifficulty());
            receivedMessage[0] = tempBlock.getData();

        });

        return receivedMessage[0];
    }

    public String dataListener(){
        if(isConnected) {
            try {
                BloomChain tempChain = (BloomChain) this.inputStream.readObject();
                if (tempChain.isChainValid()) {
                    this.bloomChain = tempChain;
                    return mineMessage();
                }

            } catch (IOException e) {
                e.printStackTrace();
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            }
        }

        return null;
    }

    public ServerSocket getListener() {
        return this.listener;
    }

    public boolean isConnected(){
        return isConnected;
    }

    public ObjectOutputStream getOutputStream(){
        return this.outputStream;
    }

    public ObjectInputStream getInputStream(){
        return this.inputStream;
    }



}

РЕДАКТИРОВАТЬ 2: Я пыталсяждите, пока acceptedSocket.get() вернет сокет в отдельном потоке следующим образом:

new Thread(()->{
    setupStreams(this.acceptedSocket.get());
    //try-catch blocks omitted 
}).start();

Это успешно ожидает acceptedSocket возврата подключенного сокета, однако, когда я пытаюсь подключиться к другому локально работающему клиенту, я получаюследующая ошибка: java.net.SocketException: socket closed

1 Ответ

0 голосов
/ 11 мая 2018

Хорошо, после некоторой работы я наконец-то нашёл изящное решение:

Мы хотим иметь возможность прослушивать и подключаться одновременно, поэтому нам нужно ServerSocket и выполнить вызов ServerSocket#accept для приема входящих соединений.
Однако этот метод блокирует поток, поэтому для продолжения работы с нашей программой мы должны перенести этот вызов в другой поток, и, к счастью, API Java по умолчанию предоставляет простой способ сделать это.

Следующий пример кода не завершен , но обеспечивает основные функции:

Client.java:

public class Client
    implements AutoCloseable
{
  // Any other ThreadPool can be used as well
  private ExecutorService es = Executors.newCachedThreadPool();

  // port this client shall listen on
  private int port;

  // Name of the client
  private String name;

  // indicates that a connection is ongoing
  private boolean isConnected = false;

  // the socket the Client is currently connected with
  private Socket activeConenctionSocket;

  // The ServerSocket which will be listening for any incoming connection
  private ServerSocket listener;

  // The socket which has been accepted by the ServerSocket
  private Future<Socket> acceptedSocket;

  /**
   * @param port Port number by which this client shall be accessed.
   * @param name The name of this Client.
   */
  public Client( int port, String name )
  {
    this.port = port;
    this.name = name;
    this.listener = createListeningSocket();
    startListening();
  }

  private ServerSocket createListeningSocket()
  {

    ServerSocket temp = null;
    try
    {
      temp = new ServerSocket( port );
    }
    catch ( IOException e )
    {
      e.printStackTrace();
    }

    return temp;
  }

  private void startListening()
  {
    if ( !isConnected )
    {
      listener = createListeningSocket();
      acceptedSocket = es.submit( new ServAccept( listener ) );
    }
  }

  /**
   * Attempts to connect to any other socket specified by the hostname and the targetport.
   * 
   * @param host The hostname of the target to connect.
   * @param targetport The port of the target.
   */
  public void connect( String host, int targetport )
  {
    isConnected = true;
    try
    {
      activeConenctionSocket = new Socket( InetAddress.getByName( host ), targetport );
    }
    catch ( IOException e )
    {
      e.printStackTrace();
    }

    try
    {
      listener.close();
    }
    catch ( IOException e )
    {
      // this will almost certainly throw an exception but it is intended.
    }
  }

  @Override
  public void close() throws Exception
  {
    // close logic (can be rather nasty)
  }
}

Давайте шаг за шагом пройдемся по тому, как мы создаем новый объект Client:

  1. Когда мы создаем экземпляр нашего объекта, мы создаем новый ServerSocket
  2. Мы начинаем листинг с создания нового потока Callable<V> объекта, который я назвал ServAccept в качестве примера.
  3. Теперь у нас есть объект Future<T>, который будет содержать сокет, если любое соединение будет принято.

Положительным побочным эффектом метода startListening() является то, что вы можете сделать его общедоступным и вызвать его еще раз, если соединение разорвано.

Метод conenct(...) работает почти так же, как ваш setupConnection() метод, но с небольшим поворотом. ServerSocket, который все еще прослушивает другой поток, будет закрыт. Причина этого заключается в том, что нет другого способа выйти из метода accept(), в котором застрял другой поток.

Последнее (что вы должны выяснить) - это когда нужно проверить, готов ли объект Future уже.

ServAccept.java

public class ServAccept
    implements Callable<Socket>
{
  ServerSocket serv;

  public ServAccept( ServerSocket sock )
  {
    this.serv = sock;
  }

  @Override
  public Socket call() throws Exception
  {
    return serv.accept();
  }

}

EDIT:

На самом деле я должен признать, что мой подход может быть не очень хорошо подходящим для этой задачи, поэтому я решил изменить некоторые настройки. На этот раз вместо использования Future Object я решил пойти с Events / пользовательским EventListener, который просто сидит там и слушает соединение для получения. Я проверил функциональность соединения, и оно работает просто отлично, но я не реализовал решение, чтобы определить, действительно ли Клиент подключился к пиру. Я просто убедился, что клиент может удерживать только одно соединение за раз.

Изменения:

ServerAccept.java

import java.io.IOException;
import java.net.ServerSocket;

public class ServAccept implements Runnable
{
    private ServerSocket serv;
    private ConnectionReceivedListener listener;

    public ServAccept( ServerSocket sock,ConnectionReceivedListener con )
    {
        this.serv = sock;
        this.listener = con;
    }

    @Override
    public void run()
    {
        try
        {
            listener.onConnectionReceived( new ConnectionReceivedEvent( serv.accept() ) );
        } catch (IOException e)
        {
            // planned exception here.
        }
    }
}

Больше не реализует Callable<V>, но Runnable единственная причина этого изменения состоит в том, что мы больше не ожидаем никакого возврата, так как будем работать со слушателем и некоторыми сочными событиями. В любом случае, для этого нам нужно создать и передать прослушиватель этому объекту. Но сначала мы должны взглянуть на структуру слушателя / события:

ConnectionReceivedListener.java

import java.util.EventListener;

@FunctionalInterface
public interface ConnectionReceivedListener extends EventListener
{
    public void onConnectionReceived(ConnectionReceivedEvent event);
}

Просто простой интерфейс, из которого мы строим некоторые анонимные классы или лямбда-выражения. Ничего особенного. Не нужно даже расширять интерфейс EventListener, но я люблю делать это, чтобы напомнить мне, какова цель этого класса.

ConnectionReceivedEvent.java

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class ConnectionReceivedEvent
{
    private Socket accepted;

    public ConnectionReceivedEvent( Socket sock )
    {
        this.accepted = sock;
    }

    public Socket getSocket()
    {
        return accepted;
    }

    public OutputStream getOutput() throws IOException
    {
        return accepted.getOutputStream();
    }

    public InputStream getInput() throws IOException
    {
        return accepted.getInputStream();
    }

    public int getPort()
    {
        return accepted.getPort();
    }
}

Ничего особенного, просто передавая Socket в качестве параметра конструктора и определяя некоторые методы получения, из которых большинство не будет использоваться в этом примере.

Но как нам теперь его использовать?

private void startListening()
{
    if (!isConnected)
    {
        closeIfNotNull();
        listener = createListeningSocket();
        es.execute( new ServAccept( listener, event -> setAccepted( event.getSocket() ) ) );
    }
}

private void setAccepted( Socket socket )
{
    if (!isConnected)
    {
        this.activeConenctionSocket = socket;
        setUpStreams( socket );
    } else
    {
        sendError( socket );
    }
}

Мы все еще используем наш ExecutorService и создаем новый поток с классом ServAccept. Однако, поскольку мы не ожидаем никакого возврата, я изменил с ExecutorService#submit на ExecutorService#execute (просто вопрос мнения и вкуса).
Но ServAccept нужно два аргумента сейчас. ServerSocket и Listener для использования. К счастью, мы можем использовать анонимные классы, и поскольку наш слушатель имеет только один метод, мы можем даже использовать лямбда-выражения. event -> setAccepted(event.getSocket()).

В качестве ответа на ваше 2-е редактирование : Я сделал логическую ошибку. Не метод ServerSocket#close вызывает исключение при прерывании вызова ServerSocket#accept, а сам вызов accept() вызывает исключение. Другими словами, исключение, которое вы получили, было предназначено, и я по ошибке исключил другое.

...