Несколько StreamListeners к одному Topi c с Spring Cloud Stream, подключенным к Kafka - PullRequest
0 голосов
/ 19 июня 2020

У меня есть приложение Spring Boot. Я использую Spring Cloud Stream для подключения к Kafka. Я пытаюсь настроить два отдельных метода прослушивания потока для одного и того же kafka topi c.

@StreamListener("countries")
    @SendTo("aggregated-statistic")
    public KStream<?, AggregatedCountry> process(KStream<Object, Country> input) {
        return input
                .groupBy((key, value) -> value.getCountryCode())
                .aggregate(this::initialize,
                        this::aggregateAmount,
                        materializedAsPersistentStore("countries", Serdes.String(),
                                Serdes.serdeFrom(new JsonSerializer<>(),
                                        new JsonDeserializer<>(AggregatedCountry.class))))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, value));
    }
    @StreamListener("countries")
    @SendTo("daily-statistic")
    public KStream<?, List<DailyStatistics>> daily(KStream<Object, Country> input) {
        return input
                .groupBy((key, value) -> value.getCountryCode())
                .aggregate(this::initializeDailyStatistics,
                        this::dailyStatistics,
                        materializedAsPersistentStore("daily", Serdes.String(),
                                Serdes.serdeFrom(new JsonSerializer<>(),
                                        new JsonDeserializer<>(List.class))))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, value));
    }

Но когда я запускаю приложение Spring Boot, я получаю эту ошибку.

Exception in thread "kafka-stream-f4f8166b-cbeb-42ca-b461-2b3a23885a5d-StreamThread-1" java.lang.IllegalStateException: Consumer was assigned partitions [kafka-stream-daily-repartition-0] which didn't correspond to subscription request [kafka-stream-countries-repartition, countries]
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.handleAssignmentMismatch(ConsumerCoordinator.java:218)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)

Думаю, мне нужен отдельный идентификатор приложения для каждого метода StreamListener, но как я могу настроить его в файле application.yml, если я слушаю один и тот же topi c?

1 Ответ

1 голос
/ 19 июня 2020

Вам необходимо предоставить две отдельные привязки ввода (и обе из них могут указывать на один и тот же topi c). Вы не можете использовать одно и то же имя привязки на нескольких StreamListener s. Затем вы можете установить application.id для нескольких процессоров на основе StreamListener во входных привязках. Например,

spring.cloud.stream.kafka.streams.bindings.countries1.consumer.applicationId

и

spring.cloud.stream.kafka.streams.bindings.countries2.consumer.applicationId

См. в этом разделе из справочной документации.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...