Необработанное исключение asyn c в ресурсе с отправленным сервером событием RESTEasy + Spring Integracion - PullRequest
0 голосов
/ 02 апреля 2020

Задача : API построен с использованием JAX-RS RESTEasy и Spring (RESTEasy-Spring Dependency), чтобы позволить API использовать функции вставки зависимостей Spring, а также. Здесь все работает нормально. Затем в ресурсе есть метод для отправки сообщений с сервера клиенту в режиме реального времени (Server-Sent Event). Я взял этот код ( введите описание ссылки здесь ), чтобы построить метод, но в примере используется EJB и определена некоторая аннотация параллелизма (@Singleton, @LOCK, LockType. {READ, WRITE}).

Проблема : Когда я использую этот метод, сервер выдает следующее исключение:

20:15:29,669 ERROR [org.jboss.resteasy.resteasy_jaxrs.i18n] (default task-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: org.jboss.resteasy.spi.UnhandledException: java.lang.NullPointerException

У меня проблема связана с параллелизмом , Сначала приложение было автономным (RUNNING ON JETTY), затем оно было развернуто на сервере приложений wildfly, чтобы использовать функции EJB и повторить тот же пример. Однако из-за того, что я использую RESTEasy + Spring, ресурс (JAX-RS Resource) должен быть аннотирован @Component и управляться Spring Container. Я попытался заменить это на аннотацию EJB @Singleton и использовать аннотацию @Lock () также в методах, но когда это сделано, ресурс не найден (не развернут). Я приглашаю, чтобы ресурс работал с аннотацией @Component, чтобы эта интеграция (RESTEasy + Spring) работала. Я искал параллелизм с Java SE и использовал следующий пакет java.util.concurrent.locks, но безуспешно.

Это мой интерфейс и класс:

package edu.cibertec.votoelectronico.resource;

import java.io.IOException;

import javax.ejb.Lock;
import javax.ejb.LockType;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.sse.SseEventSink;

import org.springframework.context.event.EventListener;

import edu.cibertec.votoelectronico.application.event.EmitirVotoEvent;

@Path("/votoelectronico/subscribe")
public interface SSEVotoElectronicoResource {

    @GET
    @Lock(LockType.READ)
    @Path("/resultado")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void obtenerResultados(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER) @DefaultValue("-1") int lastEventId,
            @Context SseEventSink eventSink) throws IOException;

    @Lock(LockType.WRITE)
    @EventListener
    public void onEmitirVotoEvent(EmitirVotoEvent domainEvent);

    @GET
    @Path("/")
    @Produces(MediaType.APPLICATION_JSON)
    public Response test();

}

package edu.cibertec.votoelectronico.resource;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import javax.annotation.PostConstruct;
//import javax.ejb.Lock;
//import javax.ejb.LockType;
//import javax.ejb.Singleton;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.Path;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import edu.cibertec.votoelectronico.application.event.EmitirVotoEvent;
import edu.cibertec.votoelectronico.domain.complex.VotoResumen;
import edu.cibertec.votoelectronico.dto.VotoResumenDto;
import edu.cibertec.votoelectronico.mapping.MapperFactoryRegistry;
import edu.cibertec.votoelectronico.resource.communication.BaseResponse;
import edu.cibertec.votoelectronico.resource.communication.ResumenProcesoResponse;
import edu.cibertec.votoelectronico.service.VotoService;

//@Singleton
@Path("/votoelectronico/subscribe")
@Component
public class SimpleSSEVotoElectronicoResource implements SSEVotoElectronicoResource {

    private final Logger LOG = LoggerFactory.getLogger(SimpleSSEVotoElectronicoResource.class);

    private Sse sse;

    private final Lock readLock;
    private final Lock writeLock;

    @Autowired
    private VotoService service;
    @Autowired
    private MapperFactoryRegistry mapper;

    private SseBroadcaster sseBroadcaster;
    private int lastEventId;
    private final List<ResumenProcesoResponse> messages = new ArrayList<ResumenProcesoResponse>();

    public SimpleSSEVotoElectronicoResource() {
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
        this.readLock = readWriteLock.readLock();
        this.writeLock = readWriteLock.writeLock();
    }

    @PostConstruct
    public void init() {
        LOG.info("On Init method...");
    }

    @Context
    public void setSse(Sse sse) {
        this.sse = sse;
        this.sseBroadcaster = this.sse.newBroadcaster();
        this.sseBroadcaster.onError((o, e) -> {
            LOG.error("Ocurred an error on broadcasting: ", e);
        });
        this.sseBroadcaster.onClose((eventSink) -> {
            LOG.info("Broadcast closed: ", eventSink);
        });
    }

    @Override
    // @Lock(LockType.READ)
    public void obtenerResultados(int lastEventId, SseEventSink eventSink) throws IOException {
        readLock.lock();
        try {
            if (lastEventId >= 0)
                this.replyLastMessage(lastEventId, eventSink);
            sseBroadcaster.register(eventSink);
            LOG.info("Client has being registered");
        } finally {
            readLock.unlock();
        }
    }

    @Override
    // @Lock(LockType.WRITE)
    public void onEmitirVotoEvent(EmitirVotoEvent domainEvent) {
        writeLock.lock();
        try {
            LOG.info("EmitirVoto Event received");
            ResumenProcesoResponse response = this.fetchResumenProceso();
            this.messages.add(response);
            OutboundSseEvent event = createEvent(response, ++this.lastEventId);
            LOG.info("Server about to send Event");
            this.sseBroadcaster.broadcast(event);
        } finally {
            writeLock.unlock();
        }
    }

    private void replyLastMessage(int lastEventId, SseEventSink eventSink) {
        try {
            for (int i = lastEventId; i < messages.size(); i++) {
                eventSink.send(createEvent(messages.get(i), i + 1));
            }
        } catch (Exception e) {
            throw new InternalServerErrorException("Could not reply messages ", e);
        }
    }

    private ResumenProcesoResponse fetchResumenProceso() {
        ResumenProcesoResponse response = null;
        try {
            LOG.info("Fetching proccess resume..");
            Collection<VotoResumen> resultados = this.service.results();
            Function<VotoResumen, VotoResumenDto> convert = (VotoResumen object) -> this.mapper.convertFrom(object,
                    VotoResumenDto.class);
            Collection<VotoResumenDto> collection = resultados.stream().map(convert).collect(Collectors.toList());
            response = new ResumenProcesoResponse(collection);
        } catch (Exception e) {
            LOG.error("Ocurred an error while trying to get resume. " + e.getMessage());
            response = new ResumenProcesoResponse(e.getMessage());
        }
        return response;
    }

    private OutboundSseEvent createEvent(ResumenProcesoResponse response, int id) {
        return this.sse.newEventBuilder().id(String.valueOf(id)).data(response).build();
    }

    @Override
    public Response test() {
        try {
            return Response.status(Response.Status.OK).entity(new BaseResponse(true, "")).build();
        } catch (Exception e) {
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).entity(new BaseResponse(false, e.toString()))
                    .build();
        }
    }

}

Полный описание ошибки:

20:15:29,669 ERROR [org.jboss.resteasy.resteasy_jaxrs.i18n] (default task-1) RESTEASY002020: Unhandled asynchronous exception, sending back 500: org.jboss.resteasy.spi.UnhandledException: java.lang.NullPointerException
        at org.jboss.resteasy.core.ExceptionHandler.handleApplicationException(ExceptionHandler.java:82)
        at org.jboss.resteasy.core.ExceptionHandler.handleException(ExceptionHandler.java:346)
        at org.jboss.resteasy.core.SynchronousDispatcher.writeException(SynchronousDispatcher.java:193)
        at org.jboss.resteasy.core.SynchronousDispatcher.asynchronousExceptionDelivery(SynchronousDispatcher.java:510)
        at org.jboss.resteasy.core.AbstractAsynchronousResponse.internalResume(AbstractAsynchronousResponse.java:232)
        at org.jboss.resteasy.plugins.server.servlet.Servlet3AsyncHttpRequest$Servlet3ExecutionContext$Servle3AsychronousResponse.resume(Servlet3AsyncHttpRequest.java:117)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:431)
        at org.jboss.resteasy.core.ResourceMethodInvoker.lambda$invokeOnTarget$0(ResourceMethodInvoker.java:370)
        at org.jboss.resteasy.core.interception.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:356)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTarget(ResourceMethodInvoker.java:372)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:344)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:317)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:440)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$invoke$4(SynchronousDispatcher.java:229)
        at org.jboss.resteasy.core.SynchronousDispatcher.lambda$preprocess$0(SynchronousDispatcher.java:135)
        at org.jboss.resteasy.core.interception.PreMatchContainerRequestContext.filter(PreMatchContainerRequestContext.java:356)
        at org.jboss.resteasy.core.SynchronousDispatcher.preprocess(SynchronousDispatcher.java:138)
        at org.jboss.resteasy.core.SynchronousDispatcher.invoke(SynchronousDispatcher.java:215)
        at org.jboss.resteasy.plugins.server.servlet.ServletContainerDispatcher.service(ServletContainerDispatcher.java:227)
        at org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher.service(HttpServletDispatcher.java:56)
        at org.jboss.resteasy.plugins.server.servlet.HttpServletDispatcher.service(HttpServletDispatcher.java:51)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:590)
        at io.undertow.servlet.handlers.ServletHandler.handleRequest(ServletHandler.java:74)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:129)
        at io.opentracing.contrib.jaxrs2.server.SpanFinishingFilter.doFilter(SpanFinishingFilter.java:52)
        at io.undertow.servlet.core.ManagedFilter.doFilter(ManagedFilter.java:61)
        at io.undertow.servlet.handlers.FilterHandler$FilterChainImpl.doFilter(FilterHandler.java:131)
        at io.undertow.servlet.handlers.FilterHandler.handleRequest(FilterHandler.java:84)
        at io.undertow.servlet.handlers.security.ServletSecurityRoleHandler.handleRequest(ServletSecurityRoleHandler.java:62)
        at io.undertow.servlet.handlers.ServletChain$1.handleRequest(ServletChain.java:68)
        at io.undertow.servlet.handlers.ServletDispatchingHandler.handleRequest(ServletDispatchingHandler.java:36)
        at org.wildfly.extension.undertow.security.SecurityContextAssociationHandler.handleRequest(SecurityContextAssociationHandler.java:78)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.servlet.handlers.RedirectDirHandler.handleRequest(RedirectDirHandler.java:68)
        at io.undertow.servlet.handlers.security.SSLInformationAssociationHandler.handleRequest(SSLInformationAssociationHandler.java:132)
        at io.undertow.servlet.handlers.security.ServletAuthenticationCallHandler.handleRequest(ServletAuthenticationCallHandler.java:57)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.security.handlers.AbstractConfidentialityHandler.handleRequest(AbstractConfidentialityHandler.java:46)
        at io.undertow.servlet.handlers.security.ServletConfidentialityConstraintHandler.handleRequest(ServletConfidentialityConstraintHandler.java:64)
        at io.undertow.security.handlers.AuthenticationMechanismsHandler.handleRequest(AuthenticationMechanismsHandler.java:60)
        at io.undertow.servlet.handlers.security.CachedAuthenticatedSessionHandler.handleRequest(CachedAuthenticatedSessionHandler.java:77)
        at io.undertow.security.handlers.NotificationReceiverHandler.handleRequest(NotificationReceiverHandler.java:50)
        at io.undertow.security.handlers.AbstractSecurityContextAssociationHandler.handleRequest(AbstractSecurityContextAssociationHandler.java:43)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at org.wildfly.extension.undertow.security.jacc.JACCContextIdHandler.handleRequest(JACCContextIdHandler.java:61)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at org.wildfly.extension.undertow.deployment.GlobalRequestControllerHandler.handleRequest(GlobalRequestControllerHandler.java:68)
        at io.undertow.server.handlers.PredicateHandler.handleRequest(PredicateHandler.java:43)
        at io.undertow.servlet.handlers.ServletInitialHandler.handleFirstRequest(ServletInitialHandler.java:269)
        at io.undertow.servlet.handlers.ServletInitialHandler.access$100(ServletInitialHandler.java:78)
        at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:133)
        at io.undertow.servlet.handlers.ServletInitialHandler$2.call(ServletInitialHandler.java:130)
        at io.undertow.servlet.core.ServletRequestContextThreadSetupAction$1.call(ServletRequestContextThreadSetupAction.java:48)
        at io.undertow.servlet.core.ContextClassLoaderSetupAction$1.call(ContextClassLoaderSetupAction.java:43)
        at org.wildfly.extension.undertow.security.SecurityContextThreadSetupAction.lambda$create$0(SecurityContextThreadSetupAction.java:105)
        at org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1504)
        at org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1504)
        at org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1504)
        at org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1504)
        at org.wildfly.extension.undertow.deployment.UndertowDeploymentInfoService$UndertowThreadSetupAction.lambda$create$0(UndertowDeploymentInfoService.java:1504)
        at io.undertow.servlet.handlers.ServletInitialHandler.dispatchRequest(ServletInitialHandler.java:249)
        at io.undertow.servlet.handlers.ServletInitialHandler.access$000(ServletInitialHandler.java:78)
        at io.undertow.servlet.handlers.ServletInitialHandler$1.handleRequest(ServletInitialHandler.java:99)
        at io.undertow.server.Connectors.executeRootHandler(Connectors.java:376)
        at io.undertow.server.HttpServerExchange$1.run(HttpServerExchange.java:830)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:1982)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1486)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1377)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at edu.cibertec.votoelectronico.resource.SimpleSSEVotoElectronicoResource.obtenerResultados(SimpleSSEVotoElectronicoResource.java:90)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.jboss.resteasy.core.MethodInjectorImpl.invoke(MethodInjectorImpl.java:138)
        at org.jboss.resteasy.core.ResourceMethodInvoker.internalInvokeOnTarget(ResourceMethodInvoker.java:517)
        at org.jboss.resteasy.core.ResourceMethodInvoker.invokeOnTargetAfterFilter(ResourceMethodInvoker.java:406)
        ... 63 more

Пример кода:

package com.example.sse.boundary;

import com.example.sse.entity.DomainEvent;

import javax.annotation.PostConstruct;
import javax.ejb.Lock;
import javax.ejb.Singleton;
import javax.enterprise.event.Observes;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.sse.OutboundSseEvent;
import javax.ws.rs.sse.Sse;
import javax.ws.rs.sse.SseBroadcaster;
import javax.ws.rs.sse.SseEventSink;
import java.util.ArrayList;
import java.util.List;

import static javax.ejb.LockType.READ;
import static javax.ejb.LockType.WRITE;

@Path("events-examples")
@Singleton
public class EventsResource {

    @Context
    Sse sse;

    private SseBroadcaster sseBroadcaster;
    private int lastEventId;
    private List<String> messages = new ArrayList<>();

    @PostConstruct
    public void initSse() {
        sseBroadcaster = sse.newBroadcaster();

        sseBroadcaster.onError((o, e) -> {
            // ...
        });
    }

    @GET
    @Lock(READ)
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public void itemEvents(@HeaderParam(HttpHeaders.LAST_EVENT_ID_HEADER)
                           @DefaultValue("-1") int lastEventId,
                           @Context SseEventSink eventSink) {

        if (lastEventId >= 0)
            replayLastMessages(lastEventId, eventSink);

        sseBroadcaster.register(eventSink);
    }

    private void replayLastMessages(int lastEventId, SseEventSink eventSink) {
        try {
            for (int i = lastEventId; i < messages.size(); i++) {
                eventSink.send(createEvent(messages.get(i), i + 1));
            }
        } catch (Exception e) {
            throw new InternalServerErrorException("Could not replay messages ", e);
        }
    }

    private OutboundSseEvent createEvent(String message, int id) {
        return sse.newEventBuilder().id(String.valueOf(id)).data(message).build();
    }

    @Lock(WRITE)
    public void onEvent(@Observes DomainEvent domainEvent) {
        String message = domainEvent.getContents();
        messages.add(message);

        OutboundSseEvent event = createEvent(message, ++lastEventId);

        sseBroadcaster.broadcast(event);
    }

}

Исходный код здесь: введите описание ссылки здесь

Я пытался поймать ошибка, но безуспешно.

Можете ли вы помочь мне решить эту ошибку или дать несколько советов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...