Я пытался создать образец клиент / сервер, используя Angular, Spring RSocket, Spring Data Mon go Reactive.
Полные коды, проверьте здесь .
Внутренний код:
@SpringBootApplication
class RSocketServerApplication
fun main(args: Array<String>) {
runApplication<RSocketServerApplication>(*args)
}
@Controller
class MessageController(private val messages: MessageRepository) {
@MessageMapping("send")
fun hello(p: String) = this.messages.save(ChatMessage(body = p, sentAt = Instant.now())).log().then()
@MessageMapping("messages")
fun messageStream(): Flux<ChatMessage> = this.messages.getMessagesBy().log()
}
interface MessageRepository : ReactiveMongoRepository<ChatMessage, String> {
@Tailable
fun getMessagesBy(): Flux<ChatMessage>
}
@Document
data class ChatMessage(@Id var id: String? = null, var body: String, var sentAt: Instant = Instant.now())
В коде внешнего интерфейса.
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit, OnDestroy {
title = 'client';
message = '';
messages: string[];
client: RSocketClient;
sub = new Subject();
ngOnInit(): void {
this.messages = [];
// Create an instance of a client
this.client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
// ms btw sending keepalive to server
keepAlive: 60000,
// ms timeout if no keepalive response
lifetime: 180000,
// format of `data`
dataMimeType: 'application/json',
// format of `metadata`
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: 'ws://localhost:8080/rsocket'
}),
});
// Open the connection
this.client.connect().subscribe({
onComplete: (socket: RSocket) => {
// socket provides the rsocket interactions fire/forget, request/response,
// request/stream, etc as well as methods to close the socket.
socket
.requestStream({
data: "",
metadata: String.fromCharCode('messages'.length) + 'messages'
})
.subscribe({
onComplete: () => console.log('complete'),
onError: error => {
console.log(error);
this.addErrorMessage("Connection has been closed due to ", error);
},
onNext: payload => {
console.log(payload);
this.addMessage(payload.data);
},
onSubscribe: subscription => {
subscription.request(1);
},
});
this.sub.subscribe({
next: (data) => {
socket.fireAndForget({
data: data,
metadata: String.fromCharCode('send'.length) + 'send',
});
}
})
},
onError: error => {
console.log(error);
this.addErrorMessage("Connection has been refused due to ", error);
},
onSubscribe: cancel => {
/* call cancel() to abort */
}
});
}
addErrorMessage(description: string, error: any) {
console.log(description + ', ' + error);
}
addMessage(newMessage: string) {
this.messages = [...this.messages, newMessage];
}
ngOnDestroy(): void {
this.sub.unsubscribe();
if (this.client) {
this.client.close();
}
}
sendMessage() {
console.log("sending message:" + this.message);
this.sub.next(this.message);
this.message = '';
}
}
Я пытался использовать метод requestStream
для подключения маршрута сообщений , но в API rscoket-js
кажется, что мне нужно настроить параметр Subscription
и указать там номер запроса, иначе он вообще не подключается.
Когда я запускал интерфейсное приложение , Я получил следующую ошибку в консоли браузера.
Error: RSocket error 0x201 (APPLICATION_ERROR): Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
tailable cursor requested on non capped collection' on server localhost:27017; nested exception is com.mongodb.MongoQueryException: Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
tailable cursor requested on non capped collection' on server localhost:27017. See error `source` property for details.
createErrorFromFrame RSocketFrame.js:189
_handleStreamFrame RSocketMachine.js:625
_handleFrame RSocketMachine.js:192
onNext Flowable.js:233
_handleMessage RSocketWebSocketClient.js:52
_handleMessage RSocketWebSocketClient.js:52
Angular 7
app.component.ts:58:22
Connection has been closed due to , Error: RSocket error 0x201 (APPLICATION_ERROR): Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
tailable cursor requested on non capped collection' on server localhost:27017; nested exception is com.mongodb.MongoQueryException: Query failed with error code 2 and error message 'error processing query: ns=chat.chatMessage batchSize=32Tree: $and
Sort: {}
Proj: {}
tailable cursor requested on non capped collection' on server localhost:27017. See error `source` property for details.
Обновление : Я сам решил эти ограниченные коллекции после прочтения соответствующих документов. Spring Data Mon go не создает для вас ограниченную коллекцию по умолчанию.
Теперь я столкнулся с другой ошибкой в клиенте, есть ошибка неожиданно закрывается , вызванная RSocketWebsocketClient
.
Обновление 2 : решено. data
должно быть null
, когда сторона сервера не принимает полезную нагрузку.
Окончательная рабочая версия объединяется с master .