Недавно мы изменили наш API-интерфейс, чтобы использовать веб-контракт openapi с комбинацией eventbus для отправки и получения API-ответов для наших клиентов.
До сегодняшнего дня все работало нормально, когда мы заметили, что большинство наших API истекло, были выданы следующие ошибки.
API FAILURE: (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.448, repliedAddress: /v1/analytics/post
at io.vertx.core.eventbus.impl.HandlerRegistration.sendAsyncResultFailure(HandlerRegistration.java:146)
at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:78)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:907)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:866)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
Мы не смогли отладить как наши потребители, даже не получив сообщения для ответа. Наши журналы только что имели эти сообщения об ошибках.
например, веб-контракт openapi и настройка Eventbus, все наши API настроены таким образом
routerFactory.addHandlerByOperationId("addViews", routingContext -> {
RequestParameters requestParameters = routingContext.get("parsedParameters");
vertx.eventBus()
.send(Endpoints.API_ANALYTICS_POST,
Json.encodeToBuffer(requestParameters.body().getJsonObject()),
result -> {
if (result.succeeded()) {
successResponse(routingContext, result.result());
} else {
errorResponse(routingContext, result.cause());
}
});
});
registerFailureHandler(routerFactory,"addViews");
Мы используем шину общих событий с того времени, а не шину событий кластера. Есть ли ограничения на количество сообщений, которые может обрабатывать шина событий?
MessageConsumer<Buffer> views = eventBus.consumer(Endpoints.API_ANALYTICS_POST);
views.handler(message->{
addPostViews(message, message.body().toJsonObject());
});
private void addPostViews(Message message, JsonObject request) {
LOGGER.info("PostViewsEvent: {0}", request.toString());
postMetricsService.addPostViews(request).setHandler(res -> {
JsonObject response = new JsonObject();
response.put("captured", true);
if (res.succeeded()) {
message.reply(ApiResponse.getCustomSuccessResponse(response));
} else {
errorResponse(message,res.cause());
}
});
}
public Future<Boolean> addPostViews(JsonObject views) {
Future<Boolean> future = Future.future();
future.complete(true);
// mongo db query to store views
// redis query to store views
}