Фабрика маршрутизаторов OpenAPI и EventBus - PullRequest
1 голос
/ 23 апреля 2019

Недавно мы изменили наш API-интерфейс, чтобы использовать веб-контракт openapi с комбинацией eventbus для отправки и получения API-ответов для наших клиентов.

До сегодняшнего дня все работало нормально, когда мы заметили, что большинство наших API истекло, были выданы следующие ошибки.

API FAILURE: (TIMEOUT,-1) Timed out after waiting 30000(ms) for a reply. address: __vertx.reply.448, repliedAddress: /v1/analytics/post
at io.vertx.core.eventbus.impl.HandlerRegistration.sendAsyncResultFailure(HandlerRegistration.java:146)
at io.vertx.core.eventbus.impl.HandlerRegistration.lambda$new$0(HandlerRegistration.java:78)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:907)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:866)
at io.vertx.core.impl.ContextImpl.executeTask(ContextImpl.java:320)
at io.vertx.core.impl.EventLoopContext.lambda$executeAsync$0(EventLoopContext.java:38)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)

Мы не смогли отладить как наши потребители, даже не получив сообщения для ответа. Наши журналы только что имели эти сообщения об ошибках.

например, веб-контракт openapi и настройка Eventbus, все наши API настроены таким образом

routerFactory.addHandlerByOperationId("addViews", routingContext -> {
           RequestParameters requestParameters = routingContext.get("parsedParameters");

                vertx.eventBus()
                        .send(Endpoints.API_ANALYTICS_POST,
                                Json.encodeToBuffer(requestParameters.body().getJsonObject()),
                                result -> {
                                    if (result.succeeded()) {
                                        successResponse(routingContext, result.result());
                                    } else {
                                        errorResponse(routingContext, result.cause());
                                    }
                                });

        });
        registerFailureHandler(routerFactory,"addViews");

Мы используем шину общих событий с того времени, а не шину событий кластера. Есть ли ограничения на количество сообщений, которые может обрабатывать шина событий?

MessageConsumer<Buffer> views = eventBus.consumer(Endpoints.API_ANALYTICS_POST);


views.handler(message->{
        addPostViews(message, message.body().toJsonObject());
    });



private void addPostViews(Message message, JsonObject request) {
    LOGGER.info("PostViewsEvent: {0}",  request.toString());
    postMetricsService.addPostViews(request).setHandler(res -> {
        JsonObject response = new JsonObject();
        response.put("captured", true);
        if (res.succeeded()) {
            message.reply(ApiResponse.getCustomSuccessResponse(response));
        } else {
            errorResponse(message,res.cause());
        }
    });
}

public Future<Boolean> addPostViews(JsonObject views) {
    Future<Boolean> future = Future.future();
    future.complete(true);
    // mongo db query to store views
    // redis query to store views
}
...