В рамках проекта курсовой работы я пытаюсь реализовать уровень надежности поверх базового протокола UDP в Java, используя выборочное повторение: http://en.wikipedia.org/wiki/Selective_Repeat_ARQ. По сути, каждый пакет - при отправке - отслеживает свой собственный таймерв отдельной теме.Если какой-либо конкретный таймер заканчивается, пакет отправляется повторно.
При относительно больших настройках тайм-аута (например, 500 мс) этот код выполняется нормально, и большой файл полностью отправляется получателю.Тем не менее, при меньшем тайм-ауте (например, 20 мс) я получаю следующую ошибку при рассылке спама на терминал:
java.nio.channels.ClosedChannelException
at sun.nio.ch.DatagramChannelImpl.ensureOpen(DatagramChannelImpl.java:132)
at sun.nio.ch.DatagramChannelImpl.send(DatagramChannelImpl.java:241)
at Sender4.sendPak(Sender4.java:118)
at Sender4.access$000(Sender4.java:8)
at Sender4$packetTimer.run(Sender4.java:135)
Из того, что я вижу, однако, канал не закрыт.Документация для этого исключения гласит:
Проверенное исключение, выдаваемое при попытке вызвать или завершить операцию ввода-вывода для канала, который закрыт или, по крайней мере, закрыт для этой операции.То, что это исключение выдается, не обязательно означает, что канал полностью закрыт.Например, сокетный канал, половина записи которого была отключена, может быть открыт для чтения.
Что заставляет меня думать, что оно, возможно, закрыто, потому что недоступно в некотором роде.Поскольку это происходит только при меньших значениях тайм-аута, возможно, это связано с тем, что два потока пытаются выполнить повторную отправку одновременно?Однако метод отправки (sendPak) синхронизирован ... поэтому это не должно быть возможным.
В чем причина этой проблемы?Или какое исправление я могу использовать, чтобы избежать этой проблемы?Вот код для части Sender моей программы, я вполне уверен, что с Receiver все в порядке:
/* Craig Innes 0929508 */
import java.io.*;
import java.net.*;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.nio.channels.*;
public class Sender4 {
short base = 0;
short nextSeqNum = 0;
byte[][] packets;
ByteBuffer bb;
String endSys;
int portNum;
String fileName;
int retryTime;
int windowSize;
DatagramSocket clientSocket;
InetAddress IPAddress;
InetSocketAddress destination;
boolean timedOut = false;
int resends = 0;
HashMap<Short, packetTimer> timers = new HashMap<Short, packetTimer>();
DatagramChannel clientChannel;
public Sender4(String endSys, int portNum, String fileName, int retryTime, int windowSize){
this.endSys = endSys;
this.portNum = portNum;
this.fileName = fileName;
this.retryTime = retryTime;
this.windowSize = windowSize;
}
public static void main(String args[]) throws Exception{
//Check for current arguments and assign them
if(args.length != 5){
System.out.println("Invalid number of arguments. Please specify: <endSystem> <portNumber> <fileName> <retryTimeout><windowSize>");
System.exit(1);
}
Sender4 sendy = new Sender4(args[0], Integer.parseInt(args[1]), args[2], Integer.parseInt(args[3]), Integer.parseInt(args[4]));
sendy.go();
}
private void go() throws Exception{
clientChannel = DatagramChannel.open();
clientChannel.configureBlocking(false);
bb = ByteBuffer.allocate(2);
byte[] picData = new byte[1021];
byte[] sendData = new byte[1024];
byte[] seqBytes = new byte[2];
byte EOFFlag = 0;
boolean acknowledged = false;
int resends = 0;
IPAddress = InetAddress.getByName(endSys);
destination = new InetSocketAddress(IPAddress, portNum);
FileInputStream imReader = new FileInputStream(new File(fileName));
double fileSizeKb = imReader.available() / 1021.0; //We add 3 bytes to every packet, so dividing by 1021 will give us total kb sent.
int packetsNeeded = (int) Math.ceil(fileSizeKb);
packets = new byte[packetsNeeded][];
long startTime = System.currentTimeMillis();
long endTime;
double throughput;
//Create array of packets to send
for(int i = 0; i < packets.length; i++){
if(i == packets.length - 1){
EOFFlag = 1;
picData = new byte[imReader.available()];
sendData = new byte[picData.length + 3];
}
imReader.read(picData);
bb.putShort((short)i);
bb.flip();
seqBytes = bb.array();
bb.clear();
System.arraycopy(seqBytes, 0, sendData, 0, seqBytes.length);
sendData[2] = EOFFlag;
System.arraycopy(picData, 0, sendData, 3, picData.length);
packets[i] = (byte[])sendData.clone();
}
//System.out.println("timeout is: " + timedOut + " base is: " + base + " packet length is: " + packets.length + " nextSeqNum: " + nextSeqNum);
while(base != packets.length || !timers.isEmpty()){
while(nextSeqNum - base < windowSize && nextSeqNum < packets.length){
System.out.println("sending packet with seqNum: " + nextSeqNum);
sendPak(nextSeqNum);
timers.put(nextSeqNum, new packetTimer(nextSeqNum));
timers.get(nextSeqNum).start();
System.out.println("nextSeq: " + nextSeqNum + "base " + base + "windowSize " + windowSize + "timer size" + timers.size());
nextSeqNum++;
}
//Done all the sending we can, have a check for any ACKs we have received...
getACK();
}
endTime = System.currentTimeMillis();
throughput = 1000 * fileSizeKb / (endTime - startTime);
clientChannel.close();
imReader.close();
System.out.println("Number of retransmissions: " + resends);
System.out.println("Average throughput is: " + throughput + "Kb/s");
}
private synchronized void sendPak(short resNum) throws IOException{
//System.out.println("Timed out waiting for acknowledgement, resending all unACKed packets in window");
ByteBuffer sendBuff = ByteBuffer.wrap(packets[resNum]);
clientChannel.send(sendBuff, destination);
sendBuff.clear();
}
private class packetTimer extends Thread{
short sendingNum;
boolean timeToStop = false;
boolean fileACKed = false;
public packetTimer(short seqNum){
sendingNum = seqNum;
}
public void run() {
//If packet times out - resend. If thread interrupted, we have received the corresponding ack
while(waitForACK()){
System.out.println("Packet timed out. Resending packet: " + sendingNum);
try{
sendPak(sendingNum);
}catch(IOException ex){
System.out.println("I think this is causing the problems");
ex.printStackTrace();
}
}
System.out.println("Thread" + sendingNum + "has reached completion");
}
private boolean waitForACK(){
if(this.interrupted()){
return false;
}
try{
Thread.sleep(retryTime);
}catch(InterruptedException ex){
return false;
}
return true;
}
}
private synchronized void getACK() throws Exception{
//Listen out for ACKs and update pointers accordingly
ByteBuffer ackBuff;
byte[] ackData = new byte[2];
ackBuff = ByteBuffer.wrap(ackData);
SocketAddress recked = clientChannel.receive(ackBuff);
if(recked != null){ //Only if it actually receives anything, check for nullity
//System.out.println("ACK buff size: " + ackBuff.capacity() + "Current position: " + ackBuff.position() + "remaining: " + ackBuff.remaining());
ackBuff.flip();
short ack = ackBuff.getShort();
System.out.println("ack received: " + ack);
ackBuff.clear();
if(timers.containsKey(ack)){ //Stop Timer
System.out.println("Interrupting timer: " + ack);
timers.get(ack).interrupt();
timers.get(ack).fileACKed = true;
}
if(base == ack){ //If you receive ack for the base, remove all the consecutively stopped timers
while(timers.containsKey(base) && timers.get(base).fileACKed){
System.out.println("Removing: " + base);
timers.remove(base);
base++;
}
}
//System.out.println("acknowledgement for base num: " + base + "ack num:" + ack);
}
System.out.println("Waiting for base: " + base + "packets length is " + packets.length + "timers size is: " + timers.size() + "but is it empty? " + timers.isEmpty());
Thread.yield();
}
}