Как прослушать несколько тем с помощью StreamListener Spring Cloud Stream и Kafka stream - PullRequest
0 голосов
/ 25 марта 2019

Я хочу прослушать две темы Kafka, как в приведенном ниже коде, и есть два исходных события, которые необходимо обработать и преобразовать в другое событие.

Итак, я хочу прослушать эти два события в одном EnableBinding

    @EnableBinding(PartnerOrderCancelledEventImporter.Targets.class)
    public class PartnerOrderCancelledEventImporter {

       @StreamListener(Targets.INPUT_ORDER_CANCELLED)
        @SendTo(Targets.OUTPUT)
        public KStream<?, TriggeringEvent> processOrderCancelled(KStream<?, OrderCancelledV1> input) {
            LogInfo("PartnerOrderCancelled-OrderCancelledV1 stream started");
            LogInfo("  KafkaBrokers: " + KafkaBrokers);

            return input
                    .filter((key, value) -> IsFFFaultAndNoRoutes(value))
                    .peek((key, value) -> LogInfo("OrderCancelle", new LogObject(value)))
                    .map((key, value) -> KeyValue.pair(key, new TriggeringEvent(value)));
        }

        @StreamListener(Targets.INPUT_ORDER_ITEM_STOCK_CHECKED)
        @SendTo(Targets.OUTPUT)
        public KStream<?, TriggeringEvent> processOrderItemStockChecked(KStream<?, OrderItemStockCheckedV1> input) {
            LogInfo("PartnerOrderCancelled-OrderItemStockCheckedV1 stream started");
            LogInfo("  KafkaBrokers: " + KafkaBrokers);

            return input
                    .filter((key, value) -> IsItemNoStock(value))
                    .peek((key, value) -> LogInfo("OrderItemStockChecke", new LogObject(value)))
                    .map((key, value) -> KeyValue.pair(key, new TriggeringEvent(value)));
        }


        public interface Targets {

            String INPUT_ORDER_CANCELLED = "partnerOrderCancelledInputOrderCancelled";
            String INPUT_ORDER_ITEM_STOCK_CHECKED = "partnerOrderCancelledInputOrderItemStockChecked";
            String OUTPUT = "triggeringEventsOutputPartnerOrderCancelled";

            @Input(INPUT_ORDER_CANCELLED)
            KStream<?, ?> inputOrderCancelled();

            @Input(INPUT_ORDER_ITEM_STOCK_CHECKED)
            KStream<?, ?> inputOrderItemStockChecked();

            @Output(OUTPUT)
            KStream<?, ?> output();
        }
   spring.cloud.stream.bindings.triggeringEventsOutputPartnerOrderCancelled.destination=dev.comms.triggeringevents.TriggeringEvent-events-1.0
    spring.cloud.stream.bindings.partnerOrderCancelledInputOrderItemStockChecked.destination=dev.ecom.order-management-service.order-item-stock-checked-events-v1
    spring.cloud.stream.bindings.partnerOrderCancelledInputOrderItemStockChecked.group=TriggeringEvents-PartnerOrderCancelled
    spring.cloud.stream.bindings.partnerOrderCancelledInputOrderCancelled.destination=dev.ecom.order-management-service.order-cancelled-events-v1
    spring.cloud.stream.bindings.partnerOrderCancelledInputOrderCancelled.group=TriggeringEvents-PartnerOrderCancelled

выброшено ниже исключения

org.springframework.beans.factory.BeanInitializationException: Cannot setup StreamListener for public org.apache.kafka.streams.kstream.KStream com.farfetch.communication.triggeringeventsimporter.PartnerOrderCancelledEventImporter.processOrderItemStockChecked(org.apache.kafka.streams.kstream.KStream); nested exception is java.lang.reflect.UndeclaredThrowableException

    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:195)
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:195)
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.lambda$postProcessAfterInitialization$0(StreamListenerAnnotationBeanPostProcessor.java:167)
    at java.lang.Iterable.forEach(Iterable.java:75)
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.injectAndPostProcessDependencies(StreamListenerAnnotationBeanPostProcessor.java:285)
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated(StreamListenerAnnotationBeanPostProcessor.java:105)
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamStreamListenerResultAdapter.adapt(KStreamStreamListenerResultAdapter.java:41)
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamStreamListenerResultAdapter.adapt(KStreamStreamListenerResultAdapter.java:31)
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:187)
    ... 39 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:99)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212)
    ... 43 more
Caused by: java.lang.IllegalArgumentException: delegate already set to org.apache.kafka.streams.kstream.internals.KStreamImpl@491cafec

1 Ответ

0 голосов
/ 26 марта 2019

Вы получаете это исключение, потому что вы пытаетесь связать один и тот же исходящий KStream от обоих процессоров (вывод). Есть ли вероятность, что вы можете добавить еще одну выходную привязку для второго процессора? Это должно устранить эту конкретную ошибку, которую вы получаете. С другой стороны, если это требование, которое требует ваш вариант использования, то в настоящее время связыватель не поддерживает это (хотя это может быть хорошей возможностью для добавления). В качестве обходного пути вы можете заставить второй процессор ничего не возвращать, а затем вызвать метод to для KStream, чтобы отправить его вручную.

...