Выполнение вложенных асинхронных вызовов функций с использованием обратных вызовов Java - PullRequest
0 голосов
/ 10 июля 2019

У меня есть 3 уровня асинхронных вызовов функций,

  1. Я получил событие.
  2. Я получаю данные, связанные с событием, из API в асинхронном режиме.
  3. После завершения обратного вызова я обрабатываю данные и вызываю другой API асинхронно.
  4. Как только второй API возвращает ответ, я готовлю окончательные данные для публикации и асинхронно отправляю их в службу.
  5. Если все вышеперечисленные шаги выполнены успешно, я помечаю пакет сообщений, полученных из Kafka, как успешный и фиксирую обратно в Kafka.

Вот код ниже

 @Override
    public void run() {
        Request r1 = getr1();
        this.asyncHttpClient.executeRequest(r1, new AsyncCompletionHandler<Response>() {
            @Override
            public Response onCompleted(Response res1) {
                try {
                    if (res1.getStatusCode() == 200) {
                        LOG.info("Successfully retrieved samples from R1");
                        R1Response r1Response = Utils.GSON.fromJson(
                                res1.getResponseBody(), R1Response.class
                        );
                        final List<T1> t1 = r1Response.getData().stream().
                                filter(Objects::nonNull).map(T1::getSample).filter(Objects::nonNull).
                                map(x -> Utils.GSON.fromJson(x, T1.class)).
                                collect(Collectors.toList());
                        final List<T2> l1 = new LinkedList<>();
                        final Request r2 = getr2(t1, l1);
                        asyncHttpClient.executeRequest(r2, new AsyncCompletionHandler<Response>() {
                            @Override
                            public Response onCompleted(Response r2) {
                                try {
                                    if (r2.getStatusCode() == 200) {
                                        LOG.info("Got success response from service");
                                        T3 r3 = Utils.GSON.fromJson(
                                                r2.getResponseBody(), T3.class
                                        );
                                        T4 iocMessages = getT4(l1, r3);
                                        grpcClient.flushAsync(iocMessages, x -> {
                                            if (x.getCode().equals(x.Retcode.INTERNAL_ERROR)) {
                                                LOG.error("Failed to publish to service, error message: {}",
                                                        x.getErrmsg());
                                                markIntervalFailed();
                                            } else {
                                                LOG.info("Callback completed: response code: {}", x.getCode());
                                                markIntervalSuccess();
                                            }
                                        });
                                    } else {
                                        LOG.error("Failed to get the response from the API, status code: {}",
                                                r2.getStatusCode());
                                        LOG.error("Response body: {}", r2.getResponseBody());
                                        markIntervalFailed();
                                    }
                                } catch (Exception e) {
                                    LOG.error("Exception encountered while waiting for response from API: {}",
                                            e.getMessage());
                                    LogExceptionTrace.logExceptionStackTrace(e);
                                    markIntervalFailed();
                                }
                                return r2;
                            }
                        });
                    } else {
                        LOG.error("Failed to get the response from the R1, status code: {}",
                                res1.getStatusCode());
                        LOG.error("Response body: {}", res1.getResponseBody());
                        markIntervalFailed();
                    }
                } catch (Exception e) {
                    LOG.error("Exception encountered while waiting for response from R1: {}", e.getMessage());
                    LogExceptionTrace.logExceptionStackTrace(e);
                    markIntervalFailed();
                }
                return res1;
            }
        });
    }

Мне любопытно, что, поскольку все эти обратные вызовы обрабатываются потоками ввода-вывода, будет хорошей идеей связывать вызовы один за другим, а также я немного сбит с толку относительно того, как реализована линейная реализация интерфейса AsyncCompletionHandler. в функции executeRequest есть доступ к моим функциям-членам класса?

...