Я работаю над RSocket и написал большую часть java-версии, включая транспорт Aeron.
В настоящее время я бы не рекомендовал использовать реализацию Aeron. Вы можете отправлять файлы несколькими способами:
- Использование requestChannel для передачи данных на удаленный сервер.
- Используйте requestChannel или requestStream для потоковой передачи байтов клиенту.
Вот пример использования requestStream:
public class FileCopy {
public static void main(String... args) throws Exception {
// Create a socket that receives incoming connections
RSocketFactory.receive()
.acceptor(
new SocketAcceptor() {
@Override
// Create a new socket acceptor
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
return Mono.just(
new AbstractRSocket() {
@Override
public Flux<Payload> requestStream(Payload payload) {
// Get the path of the file to copy
String path = payload.getDataUtf8();
SeekableByteChannel _channel = null;
try {
_channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
} catch (IOException e) {
return Flux.error(e);
}
ReferenceCountUtil.safeRelease(payload);
SeekableByteChannel channel = _channel;
// Use Flux.generate to create a publisher that returns file at 1024 bytes
// at a time
return Flux.generate(
sink -> {
try {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int read = channel.read(buffer);
buffer.flip();
sink.next(DefaultPayload.create(buffer));
if (read == -1) {
channel.close();
sink.complete();
}
} catch (Throwable t) {
sink.error(t);
}
});
}
});
}
})
.transport(TcpServerTransport.create(9090))
.start()
.subscribe();
String path = args[0];
String dest = args[1];
// Connect to a server
RSocket client =
RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();
File f = new File(dest);
f.createNewFile();
// Open a channel to a new file
SeekableByteChannel channel =
Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
// Request a stream of bytes
client
.requestStream(DefaultPayload.create(path))
.doOnNext(
payload -> {
try {
// Write the bytes received to the new file
ByteBuffer data = payload.getData();
channel.write(data);
// Release the payload
ReferenceCountUtil.safeRelease(payload);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
// Block until all the bytes are received
.blockLast();
// Close the file you're writing too
channel.close();
}
}