У меня есть Java-программа, которая использует потоки для передачи данных через сокеты. У меня проблема в диспетчере серверов при создании пула потоков. Если, например, я создаю пул потоков из 3, ожидая подключения 3 клиентов, а затем вручную запускаю CapitalizeClient.java 3 раза, все работает нормально, и я могу прекрасно управлять потоками / закрывать сокеты, но если я создаю тот же поток-Пул, а затем автоматизировать создание CapitalizeClient.java путем создания экземпляра класса в цикле, я сталкиваюсь с огромной проблемой, заключающейся в том, что когда я собираюсь закрыть сокет, все сокеты во всей моей программе закрываются, и все отключается.
Вот класс, который доставляет мне неприятности
package parallel_hw_6;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.util.Timer;
import java.util.TimerTask;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.swing.JFrame;
import javax.swing.JOptionPane;
import javax.swing.JScrollPane;
import javax.swing.JTextArea;
import javax.swing.JTextField;
public class CapitalizeClient {
private BufferedReader in;
private PrintWriter out;
private JFrame frame = new JFrame("Capitalize Client");
private JTextField dataField = new JTextField(40);
private JTextArea messageArea = new JTextArea(8, 60);
private String name;
private JTextArea log = new JTextArea(20,20);
/**
* Constructs the client by laying out the GUI and registering a
* listener with the textfield so that pressing Enter in the
* listener sends the textfield contents to the server.
*/
public CapitalizeClient() {
// Layout GUI
messageArea.setEditable(false);
frame.getContentPane().add(dataField, "North");
frame.getContentPane().add(new JScrollPane(messageArea), "Center");
frame.getContentPane().add(new JScrollPane(log), "East");
// Add Listeners
dataField.addActionListener(new ActionListener() {
/**
* Responds to pressing the enter key in the textfield
* by sending the contents of the text field to the
* server and displaying the response from the server
* in the text area. If the response is "." we exit
* the whole application, which closes all sockets,
* streams and windows.
*/
public void actionPerformed(ActionEvent e) {
out.println(dataField.getText());
}
});
}
/**
* Implements the connection logic by prompting the end user for
* the server's IP address, connecting, setting up streams, and
* consuming the welcome messages from the server. The Capitalizer
* protocol says that the server sends three lines of text to the
* client immediately after establishing a connection.
*/
public void connectToServer() throws IOException {
// Get the server address from a dialog box.
String serverAddress = JOptionPane.showInputDialog(
frame,
"Enter IP Address of the Server:",
"Welcome to the Capitalization Program",
JOptionPane.QUESTION_MESSAGE);
// Make connection and initialize streams
Socket socket = new Socket(serverAddress, 9898);
in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
// Consume the initial welcoming messages from the server
for (int i = 0; i < 3; i++) {
messageArea.append(in.readLine() + "\n");
System.out.println("initial message: "+i);
}
name = in.readLine();
Timer timer = new Timer();
TimerTask serverHeartbeat = new TimerTask() {
@Override
public void run() {
out.println(name+"_IS_ALIVE");
}
};
TimerTask threadHeartbeat = new TimerTask() {
@Override
public void run() {
out.println(name+"_ALIVE");
}
};
timer.schedule(serverHeartbeat, 1000, 1000);
timer.schedule(threadHeartbeat, 5000, 5000);
String response;
try {
while((response = in.readLine()) != null){
System.out.println("The input is: " + response);
Matcher m2 = Pattern.compile("([\\w]*)_ALIVE").matcher(response);
if (response == null || response.equals("")) {
System.exit(0);
}else if("CLEAR_CONSOLE".equals(response)){
messageArea.setText(null);
}else if(m2.matches()){
log.append(response+"\n");
}else{
messageArea.append(response + "\n");
dataField.selectAll();
}
}
if (response == null || response.equals("")) {
System.exit(0);
}
} catch (IOException ex) {
response = "Error: " + ex;
}
}
/**
* Runs the client application.
*/
public static void main(String[] args) throws Exception {
CapitalizeClient client = new CapitalizeClient();
client.frame.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
client.frame.pack();
client.frame.setVisible(true);
client.connectToServer();
}
}
Вот как я пытаюсь создать экземпляр n = экземпляров размера пула потока дляconnect
for(int i = 0; i < numClients; i++){
(new Thread() {
@Override
public void run() {
try {
CapitalizeClient.main(null);
} catch (Exception ex) {
Logger.getLogger(CapitalizeServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}).start();
}
Примечание. Когда я создаю несколько экземпляров класса таким образом, они действительно отображаются и подключаются должным образом, а также позволяют отправлять сообщения всем им или даже определенным,просто когда я иду, чтобы закрыть сокет, заканчивая соединение с одним из клиентов, он выключает все клиенты иru менеджер сервера.
Вот копия кода менеджера сервера для справки.
package parallel_hw_6;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class CapitalizeServer {
private static ArrayList<Capitalizer> allClients = new ArrayList<Capitalizer>();
private static ServerSocket listener;
private static ServerSocket listener2;
private static ExecutorService pool = Executors.newCachedThreadPool();
private static int threadPoolNum = 0;
private static Settings s;
private static RunIt runIt;
/**
* Application method to run the server runs in an infinite loop listening
* on port 9898. When a connection is requested, it spawns a new thread to
* do the servicing and immediately returns to listening. The server keeps a
* unique client number for each client that connects just to show
* interesting logging messages. It is certainly not necessary to do this.
*/
public static void main(String[] args) throws Exception {
System.out.println("The capitalization server is running.");
int clientNumber = 0;
int settingsClientNumber = 0;
listener = new ServerSocket(9898);
listener2 = new ServerSocket(9899);
// new Settings(listener2.accept(), settingsClientNumber++).start();
s = new Settings(listener2.accept(), settingsClientNumber++);
s.start();
// threadPool(2);
runIt = new RunIt();
try {
while (true) {
// new Capitalizer(listener.accept(), clientNumber++).start();
Capitalizer c = new Capitalizer(listener.accept(), clientNumber++, false, s);
Thread t = new Thread(c);
t.start();
}
} finally {
listener.close();
}
}
/**
* A private thread to handle capitalization requests on a particular
* socket. The client terminates the dialogue by sending a single line
* containing only a period.
*/
private static class Capitalizer implements Runnable {
private Socket socket;
private int clientNumber;
private String name;
private BufferedReader in;
private PrintWriter out;
private boolean fromThreadPool;
private Settings admin;
// private PrintWriter adminOut;
public Capitalizer(Socket socket, int clientNumber, boolean fromWhere, Settings admin) {
this.admin = admin;
// this.setName("Thread" + clientNumber);
this.fromThreadPool = fromWhere;
if (this.fromThreadPool == true) {
this.name = "Threadpool_Thread_" + clientNumber;
} else {
this.name = "Standard_Thread_" + clientNumber;
}
this.socket = socket;
this.clientNumber = clientNumber;
log("\nNew connection with client# " + clientNumber + " at " + socket);
allClients.add(this);
System.out.print("\n" + allClients);
try {
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
// adminOut = new PrintWriter(adminSocket.getOutputStream(), true);
} catch (Exception e) {
}
}
/**
* Services this thread's client by first sending the client a welcome
* message then repeatedly reading strings and sending back the
* capitalized version of the string.
*/
public void run() {
try {
// Decorate the streams so we can send characters
// and not just bytes. Ensure output is flushed
// after every newline.
// Send a welcome message to the client.
out.println("Hello, your name is " + name + ".");
out.println("Enter a line with only a period to quit\n");
out.println(name);
// Get messages from the client, line by line; return them
// capitalized
while (true) {
String input = in.readLine();
// System.out.println("\nInput check: "+ input);
Matcher m = Pattern.compile("([\\w]*)_IS_ALIVE").matcher(input);
Matcher m2 = Pattern.compile("([\\w]*)_ALIVE").matcher(input);
if (input == null || input.equals(".")) {
break;
} else if (m.matches()) {
admin.showHeartbeat(input);
} else if (m2.matches()) {
heartbeatToAll(input, this.name);
} else {
out.println(input.toUpperCase());
}
// out.println(input.toUpperCase());
}
} catch (IOException e) {
log("Error handling client# " + clientNumber + ": " + e);
} finally {
try {
socket.close();
} catch (IOException e) {
log("Couldn't close a socket, what's going on?");
}
log("Connection with client# " + clientNumber + " closed");
}
}
private void sendMessage(String message) {
try {
out.println(message);
} catch (Exception e) {
log("Error: Message could not be sent");
}
}
private void terminate() {
try {
socket.close();
} catch (Exception e) {
log("Couldn't close a socket, what's going on?");
}
}
/**
* Logs a simple message. In this case we just write the message to the
* server applications standard output.
*/
private void log(String message) {
System.out.println(message);
}
}
private static void threadPool(int numClients) {
try {
Thread waitForConnection = new Thread() {
@Override
public void run() {
try {
for (int i = 0; i < numClients; i++) {
System.out.print("Threadpool" + pool);
Capitalizer c = new Capitalizer(listener.accept(), threadPoolNum++, true, s);
pool.execute(c);
}
} catch (IOException ex) {
Logger.getLogger(CapitalizeServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
};
waitForConnection.start();
System.out.println("\nInside threadPool() - Number of threads in pool is " + numClients);
// Works but closes all if you close one
for (int i = 0; i < numClients; i++) {
(new Thread() {
@Override
public void run() {
try {
CapitalizeClient.main(null);
// CapitalizeClient test = new CapitalizeClient();
// test.main(null);
} catch (Exception ex) {
Logger.getLogger(CapitalizeServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}).start();
}
} catch (Exception ex) {
Logger.getLogger(CapitalizeServer.class.getName()).log(Level.SEVERE, null, ex);
}
}
private static void heartbeatToAll(String message, String name) {
String m = message;
String n = name;
for (int i = 0; i < allClients.size(); i++) {
String cName = allClients.get(i).name;
if (!cName.equals(n)) {
allClients.get(i).sendMessage(message);
}
}
}
private static class Settings extends Thread {
private Socket socket;
private int clientNumber;
private static PrintWriter out;
public Settings(Socket socket, int clientNumber) {
this.socket = socket;
this.clientNumber = clientNumber;
log("\nNew connection with settings client# " + clientNumber + " at " + socket);
}
/**
* Services this thread's client by first sending the client a welcome
* message then repeatedly reading strings and sending back the
* capitalized version of the string.
*/
public void run() {
try {
// Decorate the streams so we can send characters
// and not just bytes. Ensure output is flushed
// after every newline.
BufferedReader in = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(), true);
// Send a welcome message to the client.
out.println("Hello, you are admin client #" + clientNumber + ".");
out.println("Enter a line with only a period to quit\n");
// Get messages from the client, line by line; return them
// capitalized
while (true) {
String input = in.readLine();
int intInput = 0;
Matcher m = Pattern.compile("([\\w]*)_IS_ALIVE").matcher(input);
if (input == null || input.equals(".")) {
break;
} else if (m.matches()) {
out.println(input);
break;
} else {
try {
intInput = Integer.parseInt(input);
System.out.print("\nintInput: " + intInput);
switch (intInput) {
case (1):
out.println("Which thread should I terminate?");
for (int i = 0; i < allClients.size(); i++) {
String toAdd = i + " : " + allClients.get(i).name;
out.println(toAdd);
}
while (true) {
String choice1 = in.readLine();
try {
int intChoice1 = Integer.parseInt(choice1);
System.out.print("\nintChoice1: " + intChoice1);
allClients.get(intChoice1).terminate();
out.println("CLEAR_CONSOLE");
out.println("Client number " + intChoice1 + " has been terminated");
if (allClients.get(intChoice1).fromThreadPool == true) {
threadPool(1);
}
allClients.remove(intChoice1);
System.out.println("\nall clients: " + allClients);
break;
} catch (Exception e) {
out.println("Enter a valid client number.");
log(e.toString());
}
}
break;
case (2):
out.println("Send to one or all?");
out.println("1. Choose 1");
out.println("2. Send to all");
while (true) {
String allOrOne = in.readLine();
try {
int intAllOrOne = Integer.parseInt(allOrOne);
System.out.print("\nintAllOrOne: " + intAllOrOne);
switch (intAllOrOne) {
case (1):
out.println("CLEAR_CONSOLE");
out.println("Which do you want to send to?");
for (int i = 0; i < allClients.size(); i++) {
String toAdd = Integer.toString(allClients.get(i).clientNumber);
String toPrint = i + " : " + allClients.get(i).name;
out.println(toPrint);
}
while (true) {
String sendToWho = in.readLine();
try {
int intSendToWho = Integer.parseInt(sendToWho);
System.out.print("\nintsendToWho: " + intSendToWho);
out.println("CLEAR_CONSOLE");
out.println("Enter message to send.");
while (true) {
String message = in.readLine();
System.out.print("\nmessage: " + message);
try {
allClients.get(intSendToWho).sendMessage(message);
break;
} catch (Exception e) {
out.print("Enter a valid client number.");
log(e.toString());
}
}
out.println("CLEAR_CONSOLE");
break;
} catch (Exception e) {
out.println("Enter a valid choice.");
log(e.toString());
}
}
break;
case (2):
out.println("CLEAR_CONSOLE");
out.println("Enter message to send test.");
while (true) {
String message = in.readLine();
System.out.print("\nmessage: " + message);
try {
for (int i = 0; i < allClients.size(); i++) {
allClients.get(i).sendMessage(message);
}
out.println("CLEAR_CONSOLE");
break;
} catch (Exception e) {
out.print("Enter a valid client number.");
log(e.toString());
}
}
break;
}
break;
} catch (Exception e) {
log(e.toString());
}
}
break;
case (3):
out.println("How many threads in the pool?");
while (true) {
String numThreads = in.readLine();
try {
int intNumThreads = Integer.parseInt(numThreads);
System.out.print("\nintNumThreads: " + intNumThreads);
out.println("CLEAR_CONSOLE");
out.println("Thread pool with" + intNumThreads + " has been created!");
out.println("Spawn new instance(s) of 'CapitalizeClient' class to utilize it.");
threadPool(intNumThreads);
break;
} catch (Exception e) {
out.println("Enter a valid number.");
log(e.toString());
}
}
break;
default:
out.println("Please enter a valid option.");
break;
}
} catch (Exception e) {
System.out.print("\nnumber format exception\n");
out.println("Please enter a valid option.");
}
}
}
} catch (IOException e) {
log("Error handling client# " + clientNumber + ": " + e);
} finally {
System.out.print("FINALLY IS EXECUTING!");
try {
socket.close();
} catch (IOException e) {
log("Couldn't close a socket, what's going on?");
}
log("Here?Connection with client# " + clientNumber + " closed");
}
}
private static void showHeartbeat(String h) {
out.println(h);
}
/**
* Logs a simple message. In this case we just write the message to the
* server applications standard output.
*/
private void log(String message) {
System.out.println(message);
}
}
}