Проблема с Rsocket JS при подключении серверной части на основе Spring RSocket - PullRequest
0 голосов
/ 12 июля 2020

Я пытался создать образец клиент / сервер, используя Angular, Spring RSocket, Spring Data Mon go Reactive.

Полные коды, проверьте здесь .

Внутренний код:

@SpringBootApplication
class RSocketServerApplication

fun main(args: Array<String>) {
    runApplication<RSocketServerApplication>(*args)
}

@Controller
class MessageController(private val messages: MessageRepository) {
    @MessageMapping("send")
    fun hello(p: String) = this.messages.save(ChatMessage(body = p, sentAt = Instant.now())).log().then()

    @MessageMapping("messages")
    fun messageStream(): Flux<ChatMessage> = this.messages.getMessagesBy().log()
}

interface MessageRepository : ReactiveMongoRepository<ChatMessage, String> {
    @Tailable
    fun getMessagesBy(): Flux<ChatMessage>
}

@Document
data class ChatMessage(@Id var id: String? = null, var body: String, var sentAt: Instant = Instant.now())

В коде внешнего интерфейса.

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {

  title = 'client';
  message = '';
  messages: string[];
  client: RSocketClient;
  sub = new Subject();

  ngOnInit(): void {
    this.messages = [];

    // Create an instance of a client
    this.client = new RSocketClient({
      serializers: {
        data: JsonSerializer,
        metadata: IdentitySerializer
      },
      setup: {
        // ms btw sending keepalive to server
        keepAlive: 60000,
        // ms timeout if no keepalive response
        lifetime: 180000,
        // format of `data`
        dataMimeType: 'application/json',
        // format of `metadata`
        metadataMimeType: 'message/x.rsocket.routing.v0',
      },
      transport: new RSocketWebSocketClient({
        url: 'ws://localhost:8080/rsocket'
      }),
    });

    // Open the connection
    this.client.connect().subscribe({
      onComplete: (socket: RSocket) => {

        // socket provides the rsocket interactions fire/forget, request/response,
        // request/stream, etc as well as methods to close the socket.
        socket
          .requestStream({
            data: "",
            metadata: String.fromCharCode('messages'.length) + 'messages'
          })
          .subscribe({
            onComplete: () => console.log('complete'),
            onError: error => {
              console.log(error);
              this.addErrorMessage("Connection has been closed due to ", error);
            },
            onNext: payload => {
              console.log(payload);
              this.addMessage(payload.data);
            },
            onSubscribe: subscription => {
              subscription.request(1);
            },
          });

        this.sub.subscribe({
          next: (data) => {
            socket.fireAndForget({
              data: data,
              metadata: String.fromCharCode('send'.length) + 'send',
            });
          }
        })
      },
      onError: error => {
        console.log(error);
        this.addErrorMessage("Connection has been refused due to ", error);
      },
      onSubscribe: cancel => {
        /* call cancel() to abort */
      }
    });
  }

  addErrorMessage(description: string, error: any) {
    console.log(description + ', ' + error);
  }

  addMessage(newMessage: string) {
    this.messages = [...this.messages, newMessage];
  }

  ngOnDestroy(): void {
    this.sub.unsubscribe();
    if (this.client) {
      this.client.close();
    }
  }

  sendMessage() {
    console.log("sending message:" + this.message);
    this.sub.next(this.message);
    this.message = '';
  }
}

Я пытался использовать метод requestStream для подключения маршрута сообщений , но в API rscoket-js кажется, что мне нужно настроить параметр Subscription и указать там номер запроса, иначе он вообще не подключается.

Когда я запускал интерфейсное приложение , Я получил следующую ошибку в консоли браузера.

Error: RSocket error 0x201 (APPLICATION_ERROR): Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
 tailable cursor requested on non capped collection' on server localhost:27017; nested exception is com.mongodb.MongoQueryException: Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
 tailable cursor requested on non capped collection' on server localhost:27017. See error `source` property for details.
    createErrorFromFrame RSocketFrame.js:189
    _handleStreamFrame RSocketMachine.js:625
    _handleFrame RSocketMachine.js:192
    onNext Flowable.js:233
    _handleMessage RSocketWebSocketClient.js:52
    _handleMessage RSocketWebSocketClient.js:52
    Angular 7
app.component.ts:58:22
Connection has been closed due to , Error: RSocket error 0x201 (APPLICATION_ERROR): Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
 tailable cursor requested on non capped collection' on server localhost:27017; nested exception is com.mongodb.MongoQueryException: Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
 tailable cursor requested on non capped collection' on server localhost:27017. See error `source` property for details.

Обновление : Я сам решил эти ограниченные коллекции после прочтения соответствующих документов. Spring Data Mon go не создает для вас ограниченную коллекцию по умолчанию.

Теперь я столкнулся с другой ошибкой в ​​клиенте, есть ошибка неожиданно закрывается , вызванная RSocketWebsocketClient.

Обновление 2 : решено. data должно быть null, когда сторона сервера не принимает полезную нагрузку.

Окончательная рабочая версия объединяется с master .

...