У нас тут несколько проблем:
- Мы не можем просто обернуть SocketFactories друг в друга, как мы можем это сделать для InputStreams и OutputStreams.
- Java-основанный на zlib DeflatorOutputStream не реализует сброс.
Я думаю, что нашел механизм, как это могло бы работать.
Это будет серия из нескольких частей, так как для ее написания требуется некоторое время.
(Вы можете найти исходный код завершенного материала в моем репозитории github ).
Пользовательский SocketImpl
A Socket
всегда основан на объекте, реализующем SocketImpl
. Таким образом, наличие собственного сокета фактически означает использование собственного класса SocketImpl. Вот реализация, основанная на паре потоков (и базовом сокете, для целей закрытия):
/**
* A SocketImpl implementation which works on a pair
* of streams.
*
* A instance of this class represents an already
* connected socket, thus all the methods relating to
* connecting, accepting and such are not implemented.
*
* The implemented methods are {@link #getInputStream},
* {@link #getOutputStream}, {@link #available} and the
* shutdown methods {@link #close}, {@link #shutdownInput},
* {@link #shutdownOutput}.
*/
private static class WrappingSocketImpl extends SocketImpl {
private InputStream inStream;
private OutputStream outStream;
private Socket base;
WrappingSocketImpl(StreamPair pair, Socket base) {
this.inStream = pair.input;
this.outStream = pair.output;
this.base = base;
}
A StreamPair
- простой класс держателя данных, см. Ниже.
Это важные методы:
protected InputStream getInputStream() {
return inStream;
}
protected OutputStream getOutputStream() {
return outStream;
}
protected int available() throws IOException {
return inStream.available();
}
Тогда некоторые методы, чтобы разрешить закрытие. Они на самом деле не проверены (может быть, мы также должны закрыть или хотя бы очистить потоки?), Но, похоже, это работает для нашего использования RMI.
protected void close() throws IOException {
base.close();
}
protected void shutdownInput() throws IOException {
base.shutdownInput();
// TODO: inStream.close() ?
}
protected void shutdownOutput() throws IOException {
base.shutdownOutput();
// TODO: outStream.close()?
}
Следующие несколько методов будут вызываться конструктором Socket (или косвенно чем-то в движке RMI), но на самом деле ничего делать не нужно.
protected void create(boolean stream) {
if(!stream) {
throw new IllegalArgumentException("datagram socket not supported.");
}
}
public Object getOption(int optID) {
System.err.println("getOption(" + optID + ")");
return null;
}
public void setOption(int optID, Object value) {
// noop, as we don't have any options.
}
Все остальные методы не обязательны, мы реализуем их, генерируя исключения (поэтому мы заметим, что это предположение было неверным).
// unsupported operations
protected void connect(String host, int port) {
System.err.println("connect(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(InetAddress address, int port) {
System.err.println("connect(" + address + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void connect(SocketAddress addr, int timeout) {
System.err.println("connect(" + addr + ", " + timeout + ")");
throw new UnsupportedOperationException();
}
protected void bind(InetAddress host, int port) {
System.err.println("bind(" + host + ", " + port + ")");
throw new UnsupportedOperationException();
}
protected void listen(int backlog) {
System.err.println("listen(" + backlog + ")");
throw new UnsupportedOperationException();
}
protected void accept(SocketImpl otherSide) {
System.err.println("accept(" + otherSide + ")");
throw new UnsupportedOperationException();
}
protected void sendUrgentData(int data) {
System.err.println("sendUrgentData()");
throw new UnsupportedOperationException();
}
}
Вот StreamPair, используемый конструктором:
/**
* A simple holder class for a pair of streams.
*/
public static class StreamPair {
public InputStream input;
public OutputStream output;
public StreamPair(InputStream in, OutputStream out) {
this.input = in; this.output = out;
}
}
Следующая часть: используйте это для реализации фабрики сокетов.
Фабрика Сокетов, обертывающая другую.
Здесь мы имеем дело с фабриками сокетов RMI (т. Е. RMIClientSocketFactory , RMIServerSocketFactory , RMISocketFactory в java.rmi.server), но та же идея применима к другие библиотеки, использующие интерфейс фабрики сокетов. Примеры: javax.net.SocketFactory (и ServerSocketFactory ), Apache Axis ' SocketFactory , JSch's SocketFactory .
Часто идея этих фабрик заключается в том, что они каким-то образом подключаются к другому серверу, а не к исходному ( прокси ), затем выполняют некоторые переговоры, и любой из них может продолжать работу в том же соединении или должен туннелировать реальное соединение через какой-то другой протокол (используя обтекание потоков). Вместо этого мы хотим позволить какой-нибудь другой фабрике сокетов выполнить исходное соединение, а затем делать только поток, обертывающий себя.
RMI имеет отдельные интерфейсы для фабрик клиентских и серверных сокетов. Фабрики клиентских сокетов будут сериализованы и переданы с сервера клиенту вместе с удаленными заглушками, что позволит клиенту достичь сервера.
Существует также абстрактный класс RMISocketFactory
, реализующий оба интерфейса и обеспечивающий глобальную фабрику сокетов по умолчанию VM, которая будет использоваться для всех удаленных объектов, у которых нет своих собственных.
Теперь мы реализуем подкласс этого класса (и, следовательно, также реализуем оба интерфейса), позволяя пользователю указать базовую фабрику сокетов клиента и сервера, которую мы затем будем использовать. Наш класс должен быть сериализуемым, чтобы передавать его клиентам.
/**
* A base class for RMI socket factories which do their
* work by wrapping the streams of Sockets from another
* Socket factory.
*
* Subclasses have to overwrite the {@link #wrap} method.
*
* Instances of this class can be used as both client and
* server socket factories, or as only one of them.
*/
public abstract class WrappingSocketFactory
extends RMISocketFactory
implements Serializable
{
(Представьте себе все остальные отступы относительно этого класса.)
Как мы хотим сослаться на другие фабрики, здесь поля.
/**
* The base client socket factory. This will be serialized.
*/
private RMIClientSocketFactory baseCFactory;
/**
* The base server socket factory. This will not be serialized,
* since the server socket factory is used only on the server side.
*/
private transient RMIServerSocketFactory baseSFactory;
Они будут инициализированы простыми конструкторами (которые я здесь не повторяю - посмотрите полный репозиторий в github-репозитории).
Аннотация wrap
Метод
Чтобы сделать это "обертывание фабрик сокетов" общим, мы используем здесь только общий механизм и выполняем фактическое оборачивание потоков в подклассы. Тогда мы можем иметь сжимающий / распаковывающий подкласс, шифрующий, регистрирующий и т. Д.
Здесь мы объявляем только метод wrap
:
/**
* Wraps a pair of streams.
* Subclasses must implement this method to do the actual
* work.
* @param input the input stream from the base socket.
* @param output the output stream to the base socket.
* @param server if true, we are constructing a socket in
* {@link ServerSocket#accept}. If false, this is a pure
* client socket.
*/
protected abstract StreamPair wrap(InputStream input,
OutputStream output,
boolean server);
Этот метод (и тот факт, что Java не допускает множественных возвращаемых значений) является причиной для класса StreamPair. В качестве альтернативы у нас может быть два отдельных метода, но в некоторых случаях (например, для SSL) необходимо знать, какие два потока сопряжены.
Фабрика клиентских сокетов
Теперь давайте взглянем на реализацию фабрики клиентских сокетов:
/**
* Creates a client socket and connects it to the given host/port pair.
*
* This retrieves a socket to the host/port from the base client
* socket factory and then wraps a new socket (with a custom SocketImpl)
* around it.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public Socket createSocket(String host, int port)
throws IOException
{
Socket baseSocket = baseCFactory.createSocket(host, port);
Мы получаем сокет с нашей базовой фабрики, а затем ...
StreamPair streams = this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
false);
... обернуть свои потоки новыми потоками. (Это wrap
должно быть реализовано подклассами, см. Ниже).
SocketImpl wrappingImpl = new WrappingSocketImpl(streams, baseSocket);
Затем мы используем эти потоки для создания нашего WrappingSocketImpl (см. Выше) и передаем его ...
return new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
};
... на новое гнездо. Мы должны создать подкласс Socket
, потому что этот конструктор защищен, но это уместно, поскольку нам также необходимо переопределить метод isConnected
, чтобы он возвращал true
вместо false
. (Помните, что наш SocketImpl уже подключен и не поддерживает подключение.)
}
Для фабрик клиентских сокетов этого уже достаточно. Для фабрик серверных сокетов это немного сложнее.
Упаковка ServerSockets
Кажется, нет никакого способа создать ServerSocket с данным объектом SocketImpl - он всегда использует статический SocketImplFactory. Таким образом, мы теперь создаем подкласс ServerSocket, просто игнорируя его SocketImpl, вместо этого делегируя другому ServerSocket.
/**
* A server socket subclass which wraps our custom sockets around the
* sockets retrieves by a base server socket.
*
* We only override enough methods to work. Basically, this is
* a unbound server socket, which handles {@link #accept} specially.
*/
private class WrappingServerSocket extends ServerSocket {
private ServerSocket base;
public WrappingServerSocket(ServerSocket b)
throws IOException
{
this.base = b;
}
Оказывается, мы должны реализовать это getLocalPort
, поскольку этот номер отправляется клиентам с удаленной заглушкой.
/**
* returns the local port this ServerSocket is bound to.
*/
public int getLocalPort() {
return base.getLocalPort();
}
Следующий метод является важным. Он работает аналогично нашему createSocket()
методу, описанному выше.
/**
* accepts a connection from some remote host.
* This will accept a socket from the base socket, and then
* wrap a new custom socket around it.
*/
public Socket accept() throws IOException {
Мы разрешаем базовому ServerSocket принимать соединение, а затем переносим его потоки:
final Socket baseSocket = base.accept();
StreamPair streams =
WrappingSocketFactory.this.wrap(baseSocket.getInputStream(),
baseSocket.getOutputStream(),
true);
Затем мы создаем наш WrappingSocketImpl, ...
SocketImpl wrappingImpl =
new WrappingSocketImpl(streams, baseSocket);
... и создать еще один анонимный подкласс Socket:
// For some reason, this seems to work only as a
// anonymous direct subclass of Socket, not as a
// external subclass. Strange.
Socket result = new Socket(wrappingImpl) {
public boolean isConnected() { return true; }
public boolean isBound() { return true; }
public int getLocalPort() {
return baseSocket.getLocalPort();
}
public InetAddress getLocalAddress() {
return baseSocket.getLocalAddress();
}
};
Для этого нужны некоторые переопределенные методы, так как они, похоже, вызываются механизмом RMI.
Я пытался поместить их в отдельный (нелокальный) класс, но это не сработало (выдал исключения на стороне клиента при подключении). Понятия не имею почему. Если у кого-то есть идея, мне интересно.
return result;
}
}
Имея этот подкласс ServerSocket, мы можем завершить наш ...
Завершение фабрики сокетов RMI
/**
* Creates a server socket listening on the given port.
*
* This retrieves a ServerSocket listening on the given port
* from the base server socket factory, and then creates a
* custom server socket, which on {@link ServerSocket#accept accept}
* wraps new Sockets (with a custom SocketImpl) around the sockets
* from the base server socket.
* @param host the host we want to be connected with.
* @param port the port we want to be connected with.
* @return a new Socket connected to the host/port pair.
* @throws IOException if something goes wrong.
*/
public ServerSocket createServerSocket(int port)
throws IOException
{
final ServerSocket baseSocket = getSSFac().createServerSocket(port);
ServerSocket ss = new WrappingServerSocket(baseSocket);
return ss;
}
Не так много, чтобы сказать, все это уже в комментарии. Да, я знаю, что могу сделать все это в одной строке. (Изначально между строками было несколько отладочных выходов.)
Давайте закончим урок:
}
В следующий раз: фабрика розеток.
Заводская розетка.
Чтобы проверить нашу упаковку и посмотреть, достаточно ли сбросов, вот метод wrap
первого подкласса:
protected StreamPair wrap(InputStream in, OutputStream out, boolean server)
{
InputStream wrappedIn = in;
OutputStream wrappedOut = new FilterOutputStream(out) {
public void write(int b) throws IOException {
System.err.println("write(.)");
super.write(b);
}
public void write(byte[] b, int off, int len)
throws IOException {
System.err.println("write(" + len + ")");
super.out.write(b, off, len);
}
public void flush() throws IOException {
System.err.println("flush()");
super.flush();
}
};
return new StreamPair(wrappedIn, wrappedOut);
}
Входной поток используется как есть, выходной поток просто добавляет некоторую регистрацию.
На стороне сервера это выглядит так ([example]
происходит от муравья):
[example] write(14)
[example] flush()
[example] write(287)
[example] flush()
[example] flush()
[example] flush()
[example] write(1)
[example] flush()
[example] write(425)
[example] flush()
[example] flush()
Мы видим, что приливов достаточно, даже более чем достаточно. (Числа являются длинами выходных блоков.)
(На стороне клиента это фактически вызывает исключение java.rmi.NoSuchObjectException. Это работало раньше ... не знаю, почему это не работает сейчас. Поскольку пример сжатия работает, и я устал, я не буду его искать Теперь.)
Далее: сжатие.
Промывка сжатых потоков
Для сжатия в Java есть несколько классов в пакете java.util.zip
. Существует пара DeflaterOutputStream
/ InflaterInputStream
, которая реализует алгоритм сжатия deflate , оборачивая другой поток, фильтруя данные через Deflater
или Inflater
соответственно. Deflater и Inflater основаны на собственных методах, вызывающих общую библиотеку zlib . (На самом деле потоки могут также поддерживать другие алгоритмы, если кто-то предоставляет подклассы с альтернативными реализациями Deflater
и Inflater
.)
(Существуют также DeflaterInputStream и InflaterOutputStream, которые работают наоборот.)
Исходя из этого, GZipOutputStream
и GZipInputStream
реализуют формат файла GZip.(Это добавляет в основном некоторые заголовки и нижний колонтитул, а также контрольную сумму.)
Оба выходных потока имеют проблему (для нашего случая использования), что они действительно не поддерживают flush()
.Это вызвано недостатком определения API Deflater, которому разрешено буферизовать столько данных, сколько нужно до финального finish()
.Zlib позволяет сбрасывать свое состояние, просто Java-оболочка слишком глупа.
С января 1999 г. об этом открыто ошибка # 4206909 , и похоже, что она наконец исправлена для Java 7,Ура!Если у вас есть Java 7, вы можете просто использовать DeflaterOutputStream здесь.
Поскольку у меня еще нет Java 7, я буду использовать обходной путь, опубликованный в комментариях к ошибке 23-го июня 2002 года rsaddey .
/**
* Workaround für kaputten GZipOutputStream, von
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see DecompressingInputStream
*/
public class CompressingOutputStream
extends DeflaterOutputStream {
public CompressingOutputStream (final OutputStream out)
{
super(out,
// Using Deflater with nowrap == true will ommit headers
// and trailers
new Deflater(Deflater.DEFAULT_COMPRESSION, true));
}
private static final byte [] EMPTYBYTEARRAY = new byte[0];
/**
* Insure all remaining data will be output.
*/
public void flush() throws IOException {
/**
* Now this is tricky: We force the Deflater to flush
* its data by switching compression level.
* As yet, a perplexingly simple workaround for
* http://developer.java.sun.com/developer/bugParade/bugs/4255743.html
*/
def.setInput(EMPTYBYTEARRAY, 0, 0);
def.setLevel(Deflater.NO_COMPRESSION);
deflate();
def.setLevel(Deflater.DEFAULT_COMPRESSION);
deflate();
out.flush();
}
/**
* Wir schließen auch den (selbst erstellten) Deflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
def.end();
}
} // class
/**
* Workaround für kaputten GZipOutputStream, von
* http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4206909
* (23-JUN-2002, rsaddey)
* @see CompressingOutputStream
*/
public class DecompressingInputStream extends InflaterInputStream {
public DecompressingInputStream (final InputStream in) {
// Using Inflater with nowrap == true will ommit headers and trailers
super(in, new Inflater(true));
}
/**
* available() should return the number of bytes that can be read without
* running into blocking wait. Accomplishing this feast would eventually
* require to pre-inflate a huge chunk of data, so we rather opt for a
* more relaxed contract (java.util.zip.InflaterInputStream does not
* fit the bill).
* This code has been tested to work with BufferedReader.readLine();
*/
public int available() throws IOException {
if (!inf.finished() && !inf.needsInput()) {
return 1;
} else {
return in.available();
}
}
/**
* Wir schließen auch den (selbst erstellten) Inflater, wenn
* wir fertig sind.
*/
public void close()
throws IOException
{
super.close();
inf.end();
}
} //class
(Это в пакете de.fencing_game.tools
в моем репозитории github .) В нем есть некоторые комментарии на немецком языке, так как я год назад скопировал это длядругой мой проект.)
При поиске в Stackoverflow я нашел этот ответ BalusC на связанный вопрос , в котором предлагается другой выходящий поток сжатия с оптимизированной очисткой.Я не проверял это, но это может быть альтернативой этому.(Он использует формат gzip , в то время как мы используем здесь чистый формат deflate . Убедитесь, что поток записи и чтения совмещен.)
Другой альтернативой будет использование JZlib , как предложено bestsss, с его ZOutputStream и ZInputStream. В нем немного документации , но я работаю над этим.
В следующий раз: фабрика сжатых RMI-сокетов
Компрессия фабрики RMI-сокетов
Теперь мы можем собрать все это вместе.
/**
* An RMISocketFactory which enables compressed transmission.
* We use {@link #CompressingInputStream} and {@link #CompressingOutputStream}
* for this.
*
* As we extend WrappingSocketFactory, this can be used on top of another
* {@link RMISocketFactory}.
*/
public class CompressedRMISocketFactory
extends WrappingSocketFactory
{
private static final long serialVersionUID = 1;
//------------ Constructors -----------------
/**
* Creates a CompressedRMISocketFactory based on a pair of
* socket factories.
*
* @param cFac the base socket factory used for creating client
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default socket factory}
* of client system where this object is finally used for
* creating sockets.
* If not null, it should be serializable.
* @param sFac the base socket factory used for creating server
* sockets. This may be {@code null}, then we will use the
* {@linkplain RMISocketFactory#getDefault() default RMI Socket factory}.
* This will not be serialized to the client.
*/
public CompressedRMISocketFactory(RMIClientSocketFactory cFac,
RMIServerSocketFactory sFac) {
super(cFac, sFac);
}
// [snipped more constructors]
//-------------- Implementation -------------
/**
* wraps a pair of streams into compressing/decompressing streams.
*/
protected StreamPair wrap(InputStream in, OutputStream out,
boolean server)
{
return new StreamPair(new DecompressingInputStream(in),
new CompressingOutputStream(out));
}
}
Вот и все.Теперь мы предоставляем этот объект фабрики UnicastRemoteObject.export(...)
в качестве аргументов (как для клиента, так и для фабрики серверов), и вся связь будет сжата.(Версия в моем репозитории github имеет основной метод с примером.)
Конечно, преимущества сжатия не будут такими большими, как RMI, по крайней мере, если вы этого не сделаетепередавать большие строки или подобные вещи в качестве аргументов или возвращаемых значений.
В следующий раз (после того, как я спал): объединить с фабрикой сокетов SSL.
Объединить с фабрикой сокетов SSL
Java-часть этого проста, если мы используем классы по умолчанию:
CompressedRMISocketFactory fac =
new CompressedRMISocketFactory(new SslRMIClientSocketFactory(),
new SslRMIServerSocketFactory());
Эти классы (в javax.rmi.ssl) используют стандартные SSLSocketFactory и SSLServerSocketFactory (в javax.net)..ssl), которые используют системное хранилище ключей и хранилище доверенных сертификатов по умолчанию.
Таким образом, необходимо создать хранилище ключей с парой ключей (например, keytool -genkeypair -v
) и предоставить его ВМ с системными свойствами.javax.net.ssl.keyStore
(имя файла для хранилища ключей) и javax.net.ssl.keyStorePassword
(пароль для хранилища ключей).
На стороне клиента нам нужно хранилище доверенных сертификатов - то есть хранилище ключей, содержащее открытые ключи.,или какой-либо сертификат, который подписал открытые ключи сервера.В целях тестирования мы просто можем использовать то же хранилище ключей, что и для сервера, а для производства вам наверняка не понадобится закрытый ключ сервера на стороне клиента.Мы предоставляем это со свойствами javax.net.ssl.trustStore
javax.net.ssl.trustStorePassword
.
Затем все сводится к следующему (на стороне сервера):
Remote server =
UnicastRemoteObject.exportObject(new EchoServerImpl(),
0, fac, fac);
System.err.println("server: " + server);
Registry registry =
LocateRegistry.createRegistry(Registry.REGISTRY_PORT);
registry.bind("echo", server);
Клиент является стандартным клиентом дляпредыдущие примеры:
Registry registry =
LocateRegistry.getRegistry("localhost",
Registry.REGISTRY_PORT);
EchoServer es = (EchoServer)registry.lookup("echo");
System.err.println("es: " + es);
System.out.println(es.echo("hallo"));
Теперь вся связь с EchoServer выполняется сжатой и зашифрованной.Конечно, для полной безопасности мы также хотели бы, чтобы связь с реестром была защищена с помощью SSL, чтобы избежать любых атак «человек посередине» (что позволило бы также перехватывать связь с EchoServer, предоставляя клиенту фальшивый RMIClientSocketFactory или фальшивыйадрес сервера).