Witajcie!
Piszę prosty serwer obsługujący z góry narzuconą ilość jednoczesnych połączeń. Ma ich być docelowo mało: dwa lub trzy.
Mój serwer to prosty Singleton
. Problem mam z klasami, które miałyby implementować jednoczesną obsługę klientów.
Korzystając z semafora nadaję uprawnienia do połączenia lub je odrzucam. Chciałbym, aby każdy klient miał swój wątek działający asynchronicznie, jednocześnie pozostawiając możliwość serwerowi kontroli nad nimi. Myślałem o tym, żeby każde połączenie było reprezentowane przez Future
, dzięki czemu miałbym możliwość wywołania w razie czego cancel(true)
, które z użyciem nieludzkiej siły zakończyłoby komunikację. Problem polega w implementacji. Wpadłem na pomysł, aby stworzyć 2 klasy:
- klasa zewnętrzna nadzorująca połączenie (z referencją do semafora, aby po zakończeniu komunikacji wywołać
semaphore.release()
) - klasa wewnętrzna odbierająca dane (na ten czas)
Pytanie brzmi: która z nich musiałaby implementować implementować interfejs Runnable
, żeby nie zatrzymać mi głównego wątku serwera? Czy obydwie powinny go implementować, aby móc jednocześnie odbierać dane i mieć możliwość ubicia komunikacji?
Już tyle razy usuwałem wszystko i próbowałem pisać od nowa, że potrzebuję pomocy.
Serwer:
package application.server;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class Server {
private int numberOfClients = 2;
private int currentClientNumber = 0;
private static Server server = null;
private Semaphore semaphore = new Semaphore(numberOfClients);
private ExecutorService executorService = Executors.newCachedThreadPool();
private ConnectionManager[] clients = new ConnectionManager[numberOfClients];
public static Server getInstance(int port) {
if (server == null) {
server = new Server(port);
}
return server;
}
private Server(int port) {
startServer(port);
}
private void startServer(int port) {
try (ServerSocket serverSocket = new ServerSocket(port))
{
Socket socket;
while (true) {
socket = serverSocket.accept();
if (isSessionAvailable()) {
semaphore.acquire();
clients[currentClientNumber++] = new ConnectionManager(socket, executorService, semaphore);
executorService.execute(clients[currentClientNumber-1]);
} else {
socket.getOutputStream().write("Too many clients!".getBytes());
socket.close();
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private boolean isSessionAvailable() {
return semaphore.availablePermits() > 0;
}
}
ConnectionManager:
package application.server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
public class ConnectionManager implements Runnable{
private final Semaphore semaphore;
private final Socket socket;
private final ExecutorService executorService;
private Future connection;
public ConnectionManager(Socket socket, ExecutorService executorService, Semaphore semaphore) {
this.socket = socket;
this.executorService = executorService;
this.semaphore = semaphore;
}
@Override
public void run() {
startConnection();
}
public void startConnection() {
connection = executorService.submit(new Connection());
try {
connection.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
public boolean killConnection() {
connection.cancel(true);
return connection.isCancelled();
}
class Connection implements Runnable{
@Override
public void run() {
try (
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()))
)
{
String input;
while (!(input = reader.readLine()).equals("exit")) {
System.out.println(input);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Client:
package application.client;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class Client {
final static Logger logger = Logger.getLogger(Client.class);
private int port;
private String host;
public Client(int port, String host) {
this.port = port;
this.host = host;
}
public void startClientSession() {
try (
Socket socket = new Socket(host, port);
PrintWriter out = new PrintWriter(socket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
)
{
logger.info("Started client service");
String line;
while (!(line = in.readLine()).equals("exit")) {
out.write(line);
}
logger.info("Ended client service");
} catch (UnknownHostException e) {
logger.error("The host is unknown!", e);
} catch (IOException e) {
logger.error("Caught I/O exception", e);
}
}
}
Przykładowe wywołanie czegoś takiego daje:
hihellohi all!
java.util.concurrent.ExecutionException: java.lang.NullPointerException
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at application.server.ConnectionManager.startConnection(ConnectionManager.java:33)
at application.server.ConnectionManager.run(ConnectionManager.java:27)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at application.server.ConnectionManager$Connection.run(ConnectionManager.java:58)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
- screenshot-20170824202427.png (240 KB) - ściągnięć: 114