Я пишу проект сокращения карт, в котором маперы (клиенты в моей программе) отправляют объект Map
на редукторы (серверы). Затем клиенты отправляют объект Boolean
на все серверы (редукторы) как уведомление о том, что оно закончило отправку. Программа работает большую часть времени, но иногда редуктор (ы) никогда не получают объект Boolean. Может кто-нибудь сказать мне, что я делаю не так?
Вот код редуктора (сервера): (я думаю, что проблема здесь где-то здесь)
private void receiveFromMappers() throws ClassNotFoundException, IOException {
int doneMappers = 0;
while (doneMappers != numOfMappers) { //keep receving until all mappers send Boolean
Object readObject = receiveObject();
if (readObject instanceof Map) {
storeInInputMap((Map) readObject);
} else if (readObject instanceof Boolean) {
doneMappers++;
}
}
}
private Object receiveObject() throws ClassNotFoundException, IOException {
Socket socket = nodeSocket.accept();
InputStream inputStream = socket.getInputStream();
ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
Object readObject = objectInputStream.readObject();
objectInputStream.close();
return readObject;
}
Код Mappers (клиентов):
private void sendDataToReducers() throws IOException {
for (int i = 0; i < numOfReducers ; i++) {
sendObjToReducer(i, reducersMaps[i]);
notifyReducer(i);
}
}
private void notifyReducer(int reducerNum) throws IOException {
sendObjToReducer(reducerNum, Boolean.TRUE);
}
private void sendObjToReducer(int reducerNum, Object obj) throws IOException {
String ip= REDUCERS_IP.replace("?", Integer.toString(reducerNum));
boolean failedConnection=true;
Socket socket=null;
while (failedConnection){ //I'm running Docker containers, so I need the mapper to keep trying until the reducer container is created
try {
socket = new Socket(ip, REDUCERS_PORT_NUM);
failedConnection=false;
} catch (IOException e){
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException("Unexpected interrupt: ", ex);
}
}
}
OutputStream outputStream = socket.getOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(obj);
objectOutputStream.flush();
objectOutputStream.close();
}
РЕДАКТИРОВАТЬ: код не вызывает каких-либо исключений, это то, что редуктор или некоторые из них могут застрять в строке socket.accept()
.