RSocket работает с сгенерированными данными, но не с Spring Reactive MongoDB - PullRequest
0 голосов
/ 01 января 2019

Сводка разрешения:

В большинстве примеров RSocket, которые в настоящее время существуют, приемник на стороне сервера просто создается как новый объект (например, новый MqttMessageService () ниже) даже в SpringBoot, связанном с ним.учебные пособия.Это нормально, если вы генерируете пример содержимого прямо в классе акцептора, но может привести к путанице, связанной с внедрением зависимостей, когда акцептор зависит от других bean-компонентов в контейнере.

Исходный вопрос:

Я получаю исключение NullPointerException при попытке потоковой передачи записей базы данных с помощью репозитория Spring Data Reactive Mongodb через Java-сервер Rsocket.

Проблема заключается в том, что во время отладки все компоненты работают отдельно :Я могу получить запрошенные данные через тот же репозиторий Mongodb, а также могу передавать произвольно сгенерированные данные между тем же сервером и клиентом, используя Rsocket.

Так что я либо упускаю что-то действительно простое, либо может быть проблема с использованием Reactive Mongodb и Rsocket вместе.

Вот исходная конфигурация Rsocket на стороне сервера :

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

А вот рабочая конфигурация Rsocket на стороне сервера с правильным DI:

@Configuration
public class RsocketConfig {

    @Autowired
    MqttMessageService messageService;

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(messageService))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

Вот реализация на стороне сервера AbstractRSocket где NullPointerException генерируется при возврате service.findAll ().

@Service
public class MqttMessageService extends AbstractRSocket {



    @Autowired 
    private MqttMessageEntityService service;

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return service.findAll()
            .map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

    }
}

Вот реактивный репозиторий и связанный сервис.Служба возвращает ноль при внедрении в реализацию сервера AbstractRSocket, но отлично работает при внедрении в другие классы:

@Service
public class MqttMessageEntityService {

    @Autowired
    private MqttMessageEntityRepository repository;

    public Flux<MqttMessageEntity> findAll() {
        return repository.findAll();
    }

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

А вот код на стороне клиента , который отлично работает с содержимым теста:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void testRsocket() {

        RSocket rSocketClient = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(8802))
                .start()
                .block();

        rSocketClient
                .requestStream(DefaultPayload.create(""))
                .blockLast();
    }        
}

Возможно, я немного превышаю свой уровень знаний здесь, и ресурсы на эту тему очень ограничены, поэтому я ценю любые подсказки к решению :))

1 Ответ

0 голосов
/ 29 января 2019

Относительно

@PostConstruct
public void startServer() {
    RSocketFactory.receive()
            .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
            .transport(TcpServerTransport.create(8802))
            .start()
            .block()
            .onClose();
}

Используете ли вы, чтобы сохранить сервер живым?Если это так, добавьте еще один блок после onClose ().

Является ли messageEntityService нулевым?Потому что это выглядит как единственное, что может вызвать ошибку, если переменные topicStart и module не являются.Особенно, если другой код работает - я не вижу ничего, что могло бы вызвать проблемы со стороны RSocket.

...