Несколько потоковых прослушивателей с Spring Cloud Stream, подключенных к Kafka - PullRequest
0 голосов
/ 15 октября 2018

В приложении Spring Boot, использующем Spring Cloud Stream для подключения к Kafka, я пытаюсь настроить два отдельных метода прослушивания потока:

  • Один читает из тем "t1" и "t2" какKTables, перераспределение с использованием другого ключа в одном, затем присоединение к данным из другого
  • Другой читает из несвязанной темы, "t3", как KStream.

Поскольку первый слушатель выполняет некоторое объединение и агрегирование, некоторые темы создаются автоматически, например, «test-1-KTABLE-AGGREGATE-STATE-STORE-0000000007-repartition-0».(Не уверен, связано ли это с проблемой.)

Когда я настраиваю код с помощью двух отдельных методов, помеченных @StreamListener, я получаю сообщение об ошибке ниже, когда запускается приложение Spring Boot:

Exception in thread "test-d44cb424-7575-4f5f-b148-afad034c93f4-StreamThread-2" java.lang.IllegalArgumentException: Assigned partition t1-0 for non-subscribed topic regex pattern; subscription pattern is t3
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignFromSubscribed(SubscriptionState.java:195)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:225)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111)
    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:848)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:805)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:771)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:741)

Я думаю, что важная часть: «Назначенный раздел t1-0 для шаблона регулярного выражения для неподписанной темы; шаблон подписки t3 ».Это две не связанные между собой темы, поэтому, насколько я понимаю, ничего не связанного с t3, следует подписываться на что-либо, связанное с t1.Точная тема, которая вызывает проблему, также периодически меняется: иногда упоминается одна из автоматически генерируемых тем, а не само t1.

Вот как настроены два потоковых слушателя (в Kotlin):

@StreamListener
fun listenerForT1AndT2(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>) {

    t2KTable
        .groupBy(...)
        .aggregate(
                { ... },
                { ... },
                { ... },
                Materialized.with(Serdes.String(), JsonSerde(SomeObj::class.java)))
        .join(t1KTable,
                { ... },
                Materialized.`as`<String, SomeObj, KeyValueStore<Bytes, ByteArray>>("test")
                        .withKeySerde(Serdes.String())
                        .withValueSerde(JsonSerde(SomeObj::class.java)))
}

@StreamListener
fun listenerForT3(@Input("t3") t3KStream: KStream<String, T3Obj>) {
    events.map { ... }
}

Однако, когда я настраивал свой код, используя только один метод, помеченный @StreamListener, и принимал параметры для всех трех тем, все отлично работало, например,

@StreamListener
fun compositeListener(
        @Input("t1") t1KTable: KTable<String, T1Obj>,
        @Input("t2") t2KTable: KTable<String, T2Obj>,
        @Input("t3") t3KStream: KStream<String, T3Obj>) {
    ...
}

НоЯ не думаю, что это правильно, что у меня может быть только один @StreamListener метод.

Я знаю, что есть маршрутизация на основе содержимого для добавления условий к аннотации StreamListener, ноесли методы определяют входные каналы, то я не уверен, нужно ли мне использовать это здесь - я бы подумал, что использование аннотаций @Input для параметров метода будет достаточно, чтобы сообщить системе, какие каналы (ипоэтому к каким темам кафки) привязываться?Если мне do нужно использовать маршрутизацию на основе содержимого, как я могу применить ее здесь, чтобы каждый метод получал только элементы из соответствующих тем?

Я также пробовалразделение двух методов слушателя на два отдельных класса, каждый из которых имеет @EnableBinding только для интересующего его интерфейса (т.е. один интерфейс для t1 и t2 и отдельный интерфейс для t3), но это не помогает.

Все остальное, что я нашел, связанное с этим сообщением об ошибке, например, здесь , касается нескольких экземпляров приложения, но в моем случае есть только один экземпляр приложения Spring Boot.

1 Ответ

0 голосов
/ 16 октября 2018

Вам нужен отдельный идентификатор приложения для каждого StreamListener метода.Вот пример:

spring.cloud.stream.kafka.streams.bindings.t1.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t2.consumer.application-id=processor1-application-id spring.cloud.stream.kafka.streams.bindings.t3.consumer.application-id=processor2-application-id

Возможно, вы хотите протестировать последний снимок (2.1.0), так как произошли некоторые недавние изменения в том, как идентификатор приложения обрабатывается связывателем,

Пожалуйста, смотрите обновление здесь для более подробной информации.Вот рабочий пример нескольких StreamListener методов, которые являются процессорами Kafka Streams.

...