Сокет Mutil-Thread Слишком много открытых файлов - PullRequest
0 голосов
/ 03 июня 2018

Я получаю

java.net.SocketException: Too many open files.

Мой код:

public class EchoServer {

    public static ExecutorService executorService;
    public static final String NEWLINE = "\r\n";
    public static long COUNT = 0;

    public EchoServer(int port) throws IOException {
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress("127.0.0.1", port));
        System.out.println("Starting echo server on port: " + port);
        while (true) {
            long start = System.currentTimeMillis();
            COUNT++;
            Socket socket = serverSocket.accept();
            ProcessTask processTask = new ProcessTask(socket, start);
            executorService.execute(processTask);

        }
    }

    public static void main(String[] args) throws IOException {
        executorService = Executors.newFixedThreadPool(5 * Runtime.getRuntime().availableProcessors());
        new EchoServer(9999);
    }

    public static class ProcessTask implements Runnable {

        private Socket socket;
        private long startTime;

        public ProcessTask(Socket socket, long startTime) {
            this.socket = socket;
            this.startTime = startTime;
        }

        public void run() {
            BufferedReader br = null;
            PrintWriter out = null;

            try {
                br = getReader(socket);
                out = getWriter(socket);

                String msg;
                while ((msg = br.readLine()) != null) {

                    String res = "Server Reply : " + msg;
                    out.println(res);
                    out.flush();
                }
                long end = System.currentTimeMillis();
                System.out.println("Closing connection with client. 耗时 : " + ((end - startTime)));
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    socket.shutdownInput();
                    socket.shutdownOutput();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

        private PrintWriter getWriter(Socket socket) throws IOException {
            OutputStream socketOut = socket.getOutputStream();
            return new PrintWriter(socketOut, true);
        }

        private BufferedReader getReader(Socket socket) throws IOException {
            InputStream socketIn = socket.getInputStream();
            return new BufferedReader(new InputStreamReader(socketIn));
        }
    }
}

public class EchoClient {

    public static final int port = 9999;
    public static final String NEWLINE = "\r\n";
    public static final long NANOSECONDS_PER_SECOND = 1000 * 1000 * 1000;
    public static final long REQUESTS_PER_SECOND    = 1000 * 1000;

    public static long COUNT = 0;

    public static void main(String args[]) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                50, 50, 3000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>());

        List<Task> tasks = new ArrayList<>();

        for (int i = 0; i < 10000; i++) {
            tasks.add(new Task(i, port));
        }

        boolean flag = false;
        while (true) {
            tasks.stream().forEach(
                    task ->
                    {
                        threadPoolExecutor.submit(task);
                        COUNT++;
                        try {
                            long sleep_time = NANOSECONDS_PER_SECOND / REQUESTS_PER_SECOND;
                            TimeUnit.NANOSECONDS.sleep(sleep_time);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            );
            if (flag) {
                break;
            }
        }

        threadPoolExecutor.shutdown();
    }

    public static class Task implements Callable<Long> {

        private int port;
        private int id;
        private String taskName;

        public Task(int id, int port) {
            this.id = id;
            this.port = port;
            this.taskName = "Client_" + this.id;
        }

        public Long call() {
            long start = -1;
            long end = -1;
            try {
                Socket socket = new Socket("127.0.0.1", port);
                start = System.currentTimeMillis();
                String msg = "From " + taskName;
                msg = msg + NEWLINE;
                for (int i = 1; i <= 1; i++) {
                    OutputStream socketOut = null;
                    BufferedReader br = null;
                    try {
                        socketOut = socket.getOutputStream();
                        socketOut.write(msg.getBytes());
                        socketOut.flush();

                        br = new BufferedReader(new InputStreamReader(
                                socket.getInputStream()));
                        String res = br.readLine();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {
                        socket.shutdownInput();
                        socket.shutdownOutput();
                    }
                }
                end = System.currentTimeMillis();

                System.out.println(taskName + " 完成发送数据!" + " 耗时 : " + ((end - start)));
            } catch (IOException e) {
                e.printStackTrace();
            }
            return (end - start);
        }
    }
}

➜  Client git:(2.0-SNAPSHOT) ✗ ulimit -a                                                       
-t: cpu time (seconds)              unlimited
-f: file size (blocks)              unlimited
-d: data seg size (kbytes)          unlimited
-s: stack size (kbytes)             8192
-c: core file size (blocks)         0
-v: address space (kbytes)          unlimited
-l: locked-in-memory size (kbytes)  unlimited
-u: processes                       2128
-n: file descriptors                1048600
➜  Client git:(2.0-SNAPSHOT) ✗ sysctl net.inet.ip.portrange                                    
net.inet.ip.portrange.lowfirst: 1023
net.inet.ip.portrange.lowlast: 600
net.inet.ip.portrange.first: 1024
net.inet.ip.portrange.last: 65535
net.inet.ip.portrange.hifirst: 49152
net.inet.ip.portrange.hilast: 65535
➜  Client git:(2.0-SNAPSHOT) ✗

1 Ответ

0 голосов
/ 03 июня 2018

Вы пропускаете FD, потому что никогда не закрываете принятые сокеты или подключенные клиентские.Вы закрываете их, но этого недостаточно, если вы закрываете их.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...