Потеря объектов через соединения сокетов с использованием потоков ввода / вывода объектов в Java? - PullRequest
0 голосов
/ 27 января 2020

Я пишу проект сокращения карт, в котором маперы (клиенты в моей программе) отправляют объект Map на редукторы (серверы). Затем клиенты отправляют объект Boolean на все серверы (редукторы) как уведомление о том, что оно закончило отправку. Программа работает большую часть времени, но иногда редуктор (ы) никогда не получают объект Boolean. Может кто-нибудь сказать мне, что я делаю не так?

Вот код редуктора (сервера): (я думаю, что проблема здесь где-то здесь)

  private void receiveFromMappers() throws ClassNotFoundException, IOException {
    int doneMappers = 0;
    while (doneMappers != numOfMappers) { //keep receving until all mappers send Boolean

      Object readObject = receiveObject();
      if (readObject instanceof Map) {
        storeInInputMap((Map) readObject);
      } else if (readObject instanceof Boolean) {
        doneMappers++;
      }
    }
  }

  private Object receiveObject() throws ClassNotFoundException, IOException {

    Socket socket = nodeSocket.accept();
    InputStream inputStream = socket.getInputStream();
    ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
    Object readObject = objectInputStream.readObject();
    objectInputStream.close();
    return readObject;

  }

Код Mappers (клиентов):

  private void sendDataToReducers() throws IOException {
    for (int i = 0; i < numOfReducers ; i++) {
      sendObjToReducer(i, reducersMaps[i]);
      notifyReducer(i);
    }
  }

  private void notifyReducer(int reducerNum) throws IOException {
    sendObjToReducer(reducerNum, Boolean.TRUE);
  }

  private void sendObjToReducer(int reducerNum, Object obj) throws IOException {
    String ip= REDUCERS_IP.replace("?", Integer.toString(reducerNum));

    boolean failedConnection=true;
    Socket socket=null;
    while (failedConnection){ //I'm running Docker containers, so I need the mapper to keep trying until the reducer container is created
      try {
        socket = new Socket(ip, REDUCERS_PORT_NUM);
        failedConnection=false;
      } catch (IOException e){
        try {
          Thread.sleep(1000);
        } catch (InterruptedException ex) {
          throw new RuntimeException("Unexpected interrupt: ", ex);
        }
      }
    }

    OutputStream outputStream = socket.getOutputStream();
    ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);

    objectOutputStream.writeObject(obj);

    objectOutputStream.flush();
    objectOutputStream.close();

  }

РЕДАКТИРОВАТЬ: код не вызывает каких-либо исключений, это то, что редуктор или некоторые из них могут застрять в строке socket.accept().

...