Весенний облачный поток / Кафка исключения - PullRequest
0 голосов
/ 11 сентября 2018

У меня проблемы с сервисом, который использует весенний облачный поток и кафку. Служба работала нормально, но вчера начал сообщать о серии исключений при запуске:

Checking for rethrow: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=2
2018-09-11 10:43:34.904 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.integration.channel.DirectChannel    : preSend on channel 'payment-reply', message: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=a9fe9b1c87b14698, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=7aa71302bc18bb4c, spanParentSpanId=a9fe9b1c87b14698, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[7aa71302bc18bb4c], spanParentSpanId=[a9fe9b1c87b14698], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=7aa71302bc18bb4c, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999}]
2018-09-11 10:43:34.904 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Created a new span in pre sendNoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Created a new span in before handleNoopSpan{context=966a10371583367f/e1d1a2a6b9ad093e}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,4476713d70434d52,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Will finish the current span after message handled NoopSpan{context=966a10371583367f/4476713d70434d52}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,966a10371583367f,c94b21ccaaed668b,false] 1 --- [container-0-C-1] o.s.c.s.i.m.TracingChannelInterceptor    : Will finish the current span after completion NoopSpan{context=966a10371583367f/c94b21ccaaed668b}
2018-09-11 10:43:34.905 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Received: 0 records
2018-09-11 10:43:35.001 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {}
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Fetch READ_UNCOMMITTED at offset 0 for partition refundResponse-0 returned fetch data (error=NONE, highWaterMark=0, lastStableOffset = -1, logStartOffset = 0, abortedTransactions = null, recordsSizeInBytes=0)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Added READ_UNCOMMITTED fetch request for partition refundResponse-0 at offset 0 to node 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.002 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-2, groupId=payment-gateway] Sending READ_UNCOMMITTED fetch for partitions [refundResponse-0] to broker 10.244.0.194:9092 (id: 2 rack: null)
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=3
2018-09-11 10:43:35.003 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=3
2018-09-11 10:43:35.004 DEBUG [payment-gateway,,,] 1 --- [container-0-C-1] o.s.i.h.a.ErrorMessageSendingRecoverer   : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[1478], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:602, deliveryAttempt=3, X-B3-ParentSpanId=7aa71302bc18bb4c, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=966a10371583367f, spanId=c94b21ccaaed668b, spanParentSpanId=7aa71302bc18bb4c, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:601, nativeHeaders={spanTraceId=[966a10371583367f], spanId=[c94b21ccaaed668b], spanParentSpanId=[7aa71302bc18bb4c], spanSampled=[0]}, kafka_offset=2299, X-B3-SpanId=c94b21ccaaed668b, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2fb81502, X-B3-Sampled=0, X-B3-TraceId=966a10371583367f, id=83994228-ba45-2303-1f7e-2eaf8f49c400, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592853999, timestamp=1536662614904}]
2018-09-11 08:44:19.837 ERROR [payment-gateway,bd9888a7d590ebf7,535db983ae0aedab,false] 1 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : 
org.springframework.messaging.MessageDeliveryException: 
Dispatcher has no subscribers for channel 'application-1.payment-reply'.; nested exception is org.springframework.integration.MessageDispatchingException: 
Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}], failedMessage=GenericMessage [payload=byte[1197], headers={errorChannel=e61450f9-fa47-446f-95ae-5021868cadfa:426, deliveryAttempt=3, X-B3-ParentSpanId=760139e0bc5d9ac0, kafka_timestampType=CREATE_TIME, kafka_receivedTopic=paymentResponse, spanTraceId=bd9888a7d590ebf7, spanId=5c6ac2c521faf6e7, spanParentSpanId=760139e0bc5d9ac0, replyChannel=e61450f9-fa47-446f-95ae-5021868cadfa:425, nativeHeaders={spanTraceId=[bd9888a7d590ebf7], spanId=[535db983ae0aedab], spanParentSpanId=[5c6ac2c521faf6e7], spanSampled=[0], X-B3-TraceId=[bd9888a7d590ebf7], X-B3-SpanId=[535db983ae0aedab], X-B3-ParentSpanId=[5c6ac2c521faf6e7], X-B3-Sampled=[0]}, kafka_offset=2258, X-B3-SpanId=5c6ac2c521faf6e7, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@59715a4a, X-B3-Sampled=0, X-B3-TraceId=bd9888a7d590ebf7, id=88531659-3fb0-a59f-bb69-54c9ba82d608, spanSampled=0, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTimestamp=1536592840192, timestamp=1536655459828}]
            at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)

через некоторое время мы видим такие исключения:

Caused by: org.springframework.messaging.core.DestinationResolutionException: failed to look up MessageChannel with name '946859a6-bc27-466d-91ba-3da93af50ac9:1' in the BeanFactory.; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named '946859a6-bc27-466d-91ba-3da93af50ac9:1' available

подключение к кафке настроено со свойством: spring.kafka.bootstrap-server = kafka.kafka:9092

и темы настроены со свойствами весеннего облачного потока: spring.cloud.stream.bindings.[topic-name].destination = blah

Взаимодействие с kafka происходит посредством весенней интеграции с кодом, подобным следующему:

@MessagingGateway
public interface StreamGateway {

    @Gateway(requestChannel = KafkaConfig.ENRICH_PAYMENT, replyChannel = ChannelNames.PAYMENT_REPLY, replyTimeout = 10000)
    String processPayment(String payload);

}

// Другой класс:

private final StreamGateway gateway;
...
gateway.processPayment(message)

Это выполняется в лазурном развертывании kubernetes, а kafka находится в отдельном модуле от службы весенней загрузки.

спасибо заранее.

Обновление: Проблема вновь возникла, и дальнейшее расследование выявило несколько вещей

  • Поскольку мы используем пружинную интеграцию @MessagingGateway и @Gateway для создания синхронного взаимодействия с Kafka, нет нормальной темы StreamListener или подписчика
  • Проблема возникает, когда в теме задержка , т.е. в теме есть сообщения за пределами смещения темы.
  • Отсутствие обычного StreamListener означает, что сообщения о задержке не имеют средств для обработки. Только когда соединение установлено с помощью MessageGateway, сообщения могут быть прочитаны из темы.
  • Одним из способов избавления от проблемы является чтение всех сообщений с задержкой, так что задержка равна 0. Служба будет запускаться нормально, однако, если я вручную отправлю сообщения в тему (вне взаимодействия с MessageGateway ), то ошибка повторяется.
  • Второе частичное решение (которое я пока полностью не понимаю) заключается в добавлении аннотации @DependsOn к MessageGateway, указывающей, что для этого требуется компонент, отдельно созданный с помощью объекта @Input SubscribeableChannel. Это означает, что SubscribeableChannel должен быть создан до MessageGateway, поэтому создается подписчик, однако StreamListener по-прежнему отсутствует, поэтому по-прежнему генерируются исключения при извлечении сообщений задержки из темы без указания места. ?

Ответы [ 2 ]

0 голосов
/ 11 сентября 2018

При просмотре журнала отладки я заметил, что сервис корректно подключался к другим темам, но возникли проблемы с темой оплаты-ответа. Я попытался удалить эту тему и перезапустить сервис. Это решило проблему.

0 голосов
/ 11 сентября 2018

Хотя я не уверен в деталях вашего приложения, ясно, что Сообщение доставляется на канал application-1.payment-reply, у которого, как говорится в ошибке, у нет подписчика .По сути, это означает, что на этом канале нет прослушивателя (например, @StreamListener или @ServiceActivator и т. Д.).

Это очень распространенная неверная конфигурация Spring Integration, но без рассмотрения вашего приложения трудноскажи где это.

...