Nats запрос / ответ в Java - PullRequest
0 голосов
/ 10 марта 2020

У меня есть знания Kafka, и я поигрался с Nats.io, который, кажется, действительно solid для обмена сообщениями.

В частности, меня интересует хорошо документированный Запрос / Ответ механизм, но у меня были трудности с его правильной реализацией в Java с драйвером Jnats.

Это мой Connector:

 // Single server nats connection
    @PostConstruct
    public void connect() throws ExternalServiceUnavailableException {

        Options options = new Options.Builder()
                .server(connectionString)
                .maxReconnects(20)
                .reconnectWait(Duration.ofSeconds(5))
                .connectionTimeout(Duration.ofSeconds(5))
                .connectionListener((conn, type) -> {
                    if (type == ConnectionListener.Events.CONNECTED) {
                        LOG.info("Connected to Nats Server");
                    } else if (type == ConnectionListener.Events.RECONNECTED) {
                        LOG.info("Reconnected to Nats Server");
                    } else if (type == ConnectionListener.Events.DISCONNECTED) {
                        LOG.error("Disconnected to Nats Server, reconnect attempt in seconds");
                    } else if (type == ConnectionListener.Events.CLOSED) {
                        LOG.info("Closed connection with Nats Server");
                    }
                })
                .build();


        try {
            connection = Nats.connect(options);

        } catch (Exception e) {
            LOG.error("Unable to connect to Nats Server");
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

Это метод запроса (с очень высоким время ожидания для целей тестирования):

 public Optional<String> asyncRequest(String topic, String message) throws ExternalServiceUnavailableException {

        Future<Message> reply = natsConnector.getConnection().request(topic, message.getBytes());
        try {

            Message msg = reply.get(10L, TimeUnit.SECONDS);

            LOG.info(new String(msg.getData()));

            return Optional.of(new String(msg.getData(), StandardCharsets.UTF_8));

        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
            LOG.error("Unable to retrieve response for the sent request: " + message);
            throw new ExternalServiceUnavailableException(ExternalServiceUnavailableException.Service.NATS);
        }

    }

И это обработчик ответа с механизмом ответа:

 @PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
        });

        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
        JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));

            if (requestMessage.getString("requestType").equals("stock-status")) {

                if (requestMessage.getString("of").equals("all")) {

                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    natsOperation.publishEvent("assets-info", response);
                    LOG.info("message sent");
                }
            }
        });
    }

Мои две независимые службы связываются через докернизированный Nats.io, и я может правильно проверить через клиента Nats Go, что сообщения были отправлены обеими службами на одну и ту же топи c.

К сожалению, «запросчик» при вызове функции asyncRequest не совсем справляется ответ даже при очень высоком ожидании в reply.get(...).

Когда я пытаюсь оценить объект reply в режиме отладки, он не содержит никаких данных и показывает TimeoutException.

В msg.getData() программа вылетает.

Ребята, у вас есть намеки на меня? Спасибо!

1 Ответ

2 голосов
/ 11 марта 2020

Вы должны изменить свой код "автоответчика" на publi sh на тему replyTo из исходного сообщения.

@PostConstruct
    private void init() {
        Dispatcher dispatcher = natsConnector.getConnection().createDispatcher(message -> {
        });

        Subscription assetsInfo = dispatcher.subscribe("assets-info", message -> {
        JSONObject requestMessage = new JSONObject(new String(message.getData(), StandardCharsets.UTF_8));

            if (requestMessage.getString("requestType").equals("stock-status")) {

                if (requestMessage.getString("of").equals("all")) {

                    JSONObject response = assetQuery.retrieveYesterdayStockStatus();
                    LOG.info("response ready");
                    //See Change Here
                    natsOperation.publish(message.getReplyTo(), response);
                    LOG.info("message sent");
                }
            }
        });
    }

Механизм ответа на запрос ищет один ответ на сгенерированный объект субъекта replyTo.

См. https://docs.nats.io/nats-concepts/reqreply

...