Исключение тайм-аута сообщения асинхронного продюсера Kafka - PullRequest
0 голосов
/ 13 февраля 2020

У нас есть приложение Springboot, которое использует spring-cloud-stream-binder-kafka для отправки сообщений в Kafka Topi c. Мы отправляем сообщение в синхронном режиме. Мы используем синхронный режим, потому что мы хотим сделать некоторые действие при возникновении ошибки во время отправки сообщения. Это работает нормально в течение некоторого времени. Мы даже провели нагрузочное тестирование с 100k-сообщениями в час, и у нас никогда не было никаких исключений TimeOut. Но в последнее время мы наблюдаем некоторые исключения из-за тайм-аута. Тайм-аут в основном происходит, когда мы отправляем одно сообщение. Интересная вещь, хотя мы получают исключение TimeOut, сообщение все еще достигает Kafka Topi c. Как только мы получим исключение тайм-аута, следующий набор сообщений проходит без перерыва. В исключении тайм-аута нет последовательности.

Код:

@Service
@EnableBinding(Source.class)
{
try
{
   source .output().send(MessageBuilder.withPayLoad(JsonStringMessage).build();
}
catch(MessageTimeOutException ex)
{
log.error("Kafka MessageTimeOut");
}
}
application .yml :

 spring:
    cloud: 
     stream:
      binders: 
       kafka1:
        type: Kafka
        environment: 
         spring: 
          cloud: 
           stream: 
            kafka:
             binder:
               brokers: broker name
               configuration: 
                  sasl.jass.config: kafkaconnectionstring
                  ssl.mechanism: PLAIN
                  security.protocol: SASL_SSL
                  retries: 3
             bindings:
              output:
               producer: 
                sync:true
      bindings:
        output: 
          binder: kafka1
          destination: Kafka topic name

Версия Springboot: 2.1.5

Spring-cloud-stream-binder-kafka: 2.1.2

Подробности исключений :

org.springframework.integration.MessageTimeOutException: время ожидания ожидания ответа от Kafkaproducer; вложенное исключение java .util.concurrent.TimeoutException

Full StackTrace:

"org.springframework.integration.MessageTimeoutException: время ожидания ответа от KafkaProducer; вложенное исключение java. util.concurrent.TimeoutException, org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult (KafkaProducerMessageHandler. java: 497) \ tat org.springframework.integration. 386) \ tat org.springframework.integration.handler. springframework.cloud.stream.binder.AbstractMessageChannelBinder $ SendingHandler.handleMessageInternal (AbstractMessageChannelBinder. java: 1095) \ tat org.springframework.integration.h andler.AbstractMessageHandler.handleMessage (AbstractMessageHandler. java: 169) \ tat org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch (AbstractDispatcher. java: 115) \ tat org.schering.DatisD UnicastingDispatcher. java: 132) \ tat org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch (UnicastingDispatcher. java: 105) \ tat org.springframework.integration.channel.AbstractSubscribeableChannel.doSendable: SubchanableChannel.doSend (Аннотация 73) \ tat org.springframework.integration.channel.AbstractMessageChannel.send (AbstractMessageChannel. java: 453) \ tat org.springframework.integration.channel.AbstractMessageChannel.send (AbstractMessageChannel. java в 401) .reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl. java: 62) \ tat sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) \ tat * 1035. 1036 *: 498) \ tat org.sprin gframework.web.method.support.InvocableHandlerMethod.doInvoke (InvocableHandlerMethod. java: 190) \ tat org.springframework.web.method.support.InvocableHandlerMethod. web.servlet. mvc .method.annotation.ServletInvocableHandlerMethod.invokeAndHandle (ServletInvocableHandlerMethod. java: 104) \ tat org.springframework.web.servlet. mvc. *: 892) \ tat org.springframework.web.servlet. mvc .method.annotation.RequestMappingHandlerAdapter.handleInternal (RequestMappingHandlerAdapter. java: 797) \ tat org.springframework.web.servlet. mvc .met. AbstractHandlerMethodAdapter.handle (AbstractHandlerMethodAdapter. java: 87) \ tat org.springframework.web.servlet.DispatcherServlet.doDispatch (DispatcherServlet. java: 1039) \ tat org. springframework.web.servlet.DispatcherServlet.doService (DispatcherServlet. java: 942) \ tat org.springframework.web.servlet.FrameworkServlet.processRequest (FrameworkServlet. java: 1005) \ tat org.sserv. FrameworkServlet.doPost (FrameworkServlet. java: 908) \ tat javax.servlet.http.HttpServlet.service (HttpServlet. java: 660) \ tat org.springframework.web.servlet.FrameworkServlet.service. *: 882) \ tat javax.servlet.http.HttpServlet.service (HttpServlet. java: 741) \ tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 231) \ org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) \ tat org. apache .tomcat.websocket.server.WsFilter.doFilter (WsFilter. java: 53) \ tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) springframework.bo ot.actuate.web.trace.servlet.HttpTraceFilter.doFilterInternal (HttpTraceFilter. java: 90) \ tat org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter. \ 1066 *) 1066 или java * .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) \ tatwork.pr. web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 320) \ tat org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke (FilterSecurityInterceptor. java: 127sec. web.access.intercept.FilterSecurityInterceptor.doFilter (FilterSecurityInterceptor. java: 91) \ tat org.springframework.security.web. web.access.ExceptionTranslationFilter.doFilter (ExceptionTranslationFilter. java: 119) \ tat org.springframewo rk.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.session.SessionManagementFilter.doFilter (SessionManagementFilter. java: pr. security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.authentication. web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter (SecurityContextHolderAware.web. FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.savedrequest. RequestCacheAwareFilter.doFilter (RequestCacheAwareFilter. java: 63) VirtualFi lterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.oauth2.provider.authentication.OAuth2AuthenticationProcessingFilter.doFilter (OAuth2AuthenticationProcessingFilter. java: 176 VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.authentication.logout.LogoutFilter. VirtualFilterChain.doFilter (FilterChainProxy. java: 334). OncePerRequestFilter. java: 107) \ tat org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.context. java: 105) \ tat org. springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. java: 334) \ tat org.springframework.security.web.context.request.asyn c .WebAsyncManagerIntegrationFilterFilter .InterFilter. ) \ tat org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter. java: 107) \ tat org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (334). .springframework.security.web.FilterChainProxy.doFilterInternal (FilterChainProxy. java: 215) \ tat org.springframework.security.web. .DelegatingFilterProxy.invokeDelegate (DelegatingFilterProxy. java: 357) \ tat org.springframework.web.filter.DelegatingFilterProxy.doFilter (DelegatingFilterProxy. java: 270) \ tat org. apache. (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core .ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) \ tat org.springframework.web.filter.RequestContextFilter.doFilterInternal (RequestContextFilter. java: 99) \ tat org.springframeililFir OnceFirterFirterFerterFerterFerterFirf . java: 107) \ tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterCha 1110 *: 166) \ tat org.springframework.web.filter.FormContentFilter.doFilterInternal (FormContentFilter. java: 92) \ tat org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequest *): 107.ter. tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. javag: 166): 166 .springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal (HiddenHttpMethodFilter. java: 93) \ tat org.springframework .web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter. java: 107) \ tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. * org. .core.ApplicationFilterChain. .actuate.metrics.web.servlet.WebMvcMetricsFilter.doFilterInternal (WebMvcMetricsFilter. java: 106) \ tat org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter: 11 * 11 * 11). .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) \ tat orgwork.prf. .CharacterEncodingFilter.doFilterInternal (CharacterEncodingFilter. java: 200) \ tat org.springframework.web.filter.OncePerRequestFilter.d oFilter (OncePerRequestFilter. java: 107) \ tat org. apache .catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain. java: 193) \ tat org. apache .catalina.core.ApplicationFilterCilhaFilter ApplicationFilterChain. java: 166) \ tat org.cloudfoun dry .router.ClientCertificateMapper.doFilter (ClientCertificateMapper. java: 77) \ tat org. apache .catalina.core.ApplicationFilterChain.inilFilter (ApplicationFilterChain.inilF). 1139 *: 193) \ tat org. apache .catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain. java: 166) \ tat org. apache .catalina.core.StandardWrapperValve.invoke (StandardWrapperValve * * : 200) \ tat org. apache .catalina.core.StandardContextValve.invoke (StandardContextValve. java: 96) \ tat org. apache .catalina.authenticator.AuthenticatorBase.invoke (AuthenticatorBase. java: 490 ) \ tat org. apache .catalina.core.StandardHostValve.invoke (StandardHostValve. java: 139) \ tat org. apache .catalina.valves.ErrorReportValve.invoke (ErrorReportValve. java: 92) \ тат орг. apache. catalina.core.StandardEngineValve.invoke (StandardEngineValve. java: 74) \ tat org. apache .catalina.valves.RemoteIpValve.invoke (RemoteIpValve. java: 679) \ tat org. apache .catalina. connector.CoyoteAdapter.service (CoyoteAdapter. java: 343) \ tat org. apache .coyote.http11.Http11Processor.service (Http11Processor. java: 408) \ tat org. apache .coyote.AbstractProcessorLight. process (AbstractProcessorLight. java: 66) \ tat org. apache .coyote.AbstractProtocol $ ConnectionHandler.process (AbstractProtocol. java: 836) \ tat org. apache .tomcat.util. net. NioEndpoint $ SocketProcessor.doRun (NioEndpoint. java: 1747) \ tat org. apache .tomcat.util. net .SocketProcessorBase.run (SocketProcessorBase. java: 49) \ tat java .util. concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor. java: 1149) \ tat java .util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor. java: 624) \ tat

...