Сводка разрешения:
В большинстве примеров RSocket, которые в настоящее время существуют, приемник на стороне сервера просто создается как новый объект (например, новый MqttMessageService () ниже) даже в SpringBoot, связанном с ним.учебные пособия.Это нормально, если вы генерируете пример содержимого прямо в классе акцептора, но может привести к путанице, связанной с внедрением зависимостей, когда акцептор зависит от других bean-компонентов в контейнере.
Исходный вопрос:
Я получаю исключение NullPointerException при попытке потоковой передачи записей базы данных с помощью репозитория Spring Data Reactive Mongodb через Java-сервер Rsocket.
Проблема заключается в том, что во время отладки все компоненты работают отдельно :Я могу получить запрошенные данные через тот же репозиторий Mongodb, а также могу передавать произвольно сгенерированные данные между тем же сервером и клиентом, используя Rsocket.
Так что я либо упускаю что-то действительно простое, либо может быть проблема с использованием Reactive Mongodb и Rsocket вместе.
Вот исходная конфигурация Rsocket на стороне сервера :
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
А вот рабочая конфигурация Rsocket на стороне сервера с правильным DI:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
Вот реализация на стороне сервера AbstractRSocket где NullPointerException генерируется при возврате service.findAll ().
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
Вот реактивный репозиторий и связанный сервис.Служба возвращает ноль при внедрении в реализацию сервера AbstractRSocket, но отлично работает при внедрении в другие классы:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
А вот код на стороне клиента , который отлично работает с содержимым теста:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
Возможно, я немного превышаю свой уровень знаний здесь, и ресурсы на эту тему очень ограничены, поэтому я ценю любые подсказки к решению :))