Как мне создать отправляющий файл клиент / сервер с RSocket? - PullRequest
0 голосов
/ 27 апреля 2018

Я не могу найти какие-либо ресурсы / учебники по RSocket , кроме чтения их кода на GitHub, чего я не понимаю.

У меня есть путь к файлу на моем сервере: String serverFilePath;

Я хотел бы иметь возможность загрузить его с моего клиента (используя Реализация Aeron от RSocket , предпочтительно). Кто-нибудь знает, как это сделать с помощью RSocket?

Заранее спасибо.

Ответы [ 2 ]

0 голосов
/ 16 мая 2019

Здесь есть возобновляемый пример передачи файлов

https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

0 голосов
/ 18 мая 2018

Я работаю над RSocket и написал большую часть java-версии, включая транспорт Aeron.

В настоящее время я бы не рекомендовал использовать реализацию Aeron. Вы можете отправлять файлы несколькими способами:

  1. Использование requestChannel для передачи данных на удаленный сервер.
  2. Используйте requestChannel или requestStream для потоковой передачи байтов клиенту.

Вот пример использования requestStream:

  public class FileCopy {

  public static void main(String... args) throws Exception {

    // Create a socket that receives incoming connections
    RSocketFactory.receive()
        .acceptor(
            new SocketAcceptor() {
              @Override
              // Create a new socket acceptor
              public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
                return Mono.just(
                    new AbstractRSocket() {
                      @Override
                      public Flux<Payload> requestStream(Payload payload) {
                        // Get the path of the file to copy
                        String path = payload.getDataUtf8();
                        SeekableByteChannel _channel = null;

                        try {
                          _channel = Files.newByteChannel(Paths.get(path), StandardOpenOption.READ);
                        } catch (IOException e) {
                          return Flux.error(e);
                        }

                        ReferenceCountUtil.safeRelease(payload);

                        SeekableByteChannel channel = _channel;
                        // Use Flux.generate to create a publisher that returns file at 1024 bytes
                        // at a time
                        return Flux.generate(
                            sink -> {
                              try {
                                ByteBuffer buffer = ByteBuffer.allocate(1024);
                                int read = channel.read(buffer);
                                buffer.flip();
                                sink.next(DefaultPayload.create(buffer));

                                if (read == -1) {
                                  channel.close();
                                  sink.complete();
                                }
                              } catch (Throwable t) {
                                sink.error(t);
                              }
                            });
                      }
                    });
              }
            })
        .transport(TcpServerTransport.create(9090))
        .start()
        .subscribe();

    String path = args[0];
    String dest = args[1];

    // Connect to a server
    RSocket client =
        RSocketFactory.connect().transport(TcpClientTransport.create(9090)).start().block();

    File f = new File(dest);
    f.createNewFile();

    // Open a channel to a new file
    SeekableByteChannel channel =
        Files.newByteChannel(f.toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);

    // Request a stream of bytes
    client
        .requestStream(DefaultPayload.create(path))
        .doOnNext(
            payload -> {
              try {
                // Write the bytes received to the new file
                ByteBuffer data = payload.getData();
                channel.write(data);

                // Release the payload
                ReferenceCountUtil.safeRelease(payload);
              } catch (Exception e) {
                  throw new RuntimeException(e);
              }
            })
        // Block until all the bytes are received
        .blockLast();

    // Close the file you're writing too
    channel.close();
  }
}
...