У меня есть 3 уровня асинхронных вызовов функций,
- Я получил событие.
- Я получаю данные, связанные с событием, из API в асинхронном режиме.
- После завершения обратного вызова я обрабатываю данные и вызываю другой API асинхронно.
- Как только второй API возвращает ответ, я готовлю окончательные данные для публикации и асинхронно отправляю их в службу.
- Если все вышеперечисленные шаги выполнены успешно, я помечаю пакет сообщений, полученных из Kafka, как успешный и фиксирую обратно в Kafka.
Вот код ниже
@Override
public void run() {
Request r1 = getr1();
this.asyncHttpClient.executeRequest(r1, new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response res1) {
try {
if (res1.getStatusCode() == 200) {
LOG.info("Successfully retrieved samples from R1");
R1Response r1Response = Utils.GSON.fromJson(
res1.getResponseBody(), R1Response.class
);
final List<T1> t1 = r1Response.getData().stream().
filter(Objects::nonNull).map(T1::getSample).filter(Objects::nonNull).
map(x -> Utils.GSON.fromJson(x, T1.class)).
collect(Collectors.toList());
final List<T2> l1 = new LinkedList<>();
final Request r2 = getr2(t1, l1);
asyncHttpClient.executeRequest(r2, new AsyncCompletionHandler<Response>() {
@Override
public Response onCompleted(Response r2) {
try {
if (r2.getStatusCode() == 200) {
LOG.info("Got success response from service");
T3 r3 = Utils.GSON.fromJson(
r2.getResponseBody(), T3.class
);
T4 iocMessages = getT4(l1, r3);
grpcClient.flushAsync(iocMessages, x -> {
if (x.getCode().equals(x.Retcode.INTERNAL_ERROR)) {
LOG.error("Failed to publish to service, error message: {}",
x.getErrmsg());
markIntervalFailed();
} else {
LOG.info("Callback completed: response code: {}", x.getCode());
markIntervalSuccess();
}
});
} else {
LOG.error("Failed to get the response from the API, status code: {}",
r2.getStatusCode());
LOG.error("Response body: {}", r2.getResponseBody());
markIntervalFailed();
}
} catch (Exception e) {
LOG.error("Exception encountered while waiting for response from API: {}",
e.getMessage());
LogExceptionTrace.logExceptionStackTrace(e);
markIntervalFailed();
}
return r2;
}
});
} else {
LOG.error("Failed to get the response from the R1, status code: {}",
res1.getStatusCode());
LOG.error("Response body: {}", res1.getResponseBody());
markIntervalFailed();
}
} catch (Exception e) {
LOG.error("Exception encountered while waiting for response from R1: {}", e.getMessage());
LogExceptionTrace.logExceptionStackTrace(e);
markIntervalFailed();
}
return res1;
}
});
}
Мне любопытно, что, поскольку все эти обратные вызовы обрабатываются потоками ввода-вывода, будет хорошей идеей связывать вызовы один за другим, а также я немного сбит с толку относительно того, как реализована линейная реализация интерфейса AsyncCompletionHandler
. в функции executeRequest
есть доступ к моим функциям-членам класса?