Существует ли какая-либо конфигурация xml для адаптера входящего канала в весенней интеграции kafka версии 3.x - PullRequest
0 голосов
/ 07 мая 2019

В нашем приложении мы используем интеграцию Spring 5.1.4 и интеграцию spring-boot-starter-интеграция 2.1.4. Мы используем конфигурацию XML для удобства, чтобы увидеть график интеграции. Теперь нам нужно прочитать сообщения из темы kafka, поэтому мы хотим использовать последнюю версию spring -gration-kafka 3.1.2.RELEASE и адаптер входящего канала kafka. Я мог бы найти примеры конфигураций XML с использованием версий Spring -gration-kafka 1.x, но не смог найти конфигурацию XML для последних версий? Если я использую более старую конфигурацию xml с версией 3.x, выдается сообщение об ошибке «Не найдено декларации для элемента int-kafka: zookeeper-connect». Может кто-нибудь помочь нам указать нам, что не так с матрицей совместимости версий, или предоставить некоторые Пример конфигурации xml для адаптера входящего канала 3.1.2 kafka для чтения из раздела kafka.

<int-kafka:zookeeper-connect
    id="zookeeperConnect" zk-connect="localhost:2181"
    zk-connection-timeout="6000" zk-session-timeout="6000"
    zk-sync-time="2000" />

<int-kafka:inbound-channel-adapter
    id="kafkaInboundChannelAdapter"
    kafka-consumer-context-ref="consumerContext" auto-startup="true"
    channel="inputFromKafka">
    <int:poller fixed-delay="2000" time-unit="MILLISECONDS" />
</int-kafka:inbound-channel-adapter>

<bean id="consumerProperties"
    class="org.springframework.beans.factory.config.PropertiesFactoryBean">
    <property name="properties">
        <props>
            <prop key="auto.offset.reset">smallest</prop>
            <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
            <prop key="fetch.message.max.bytes">5242880</prop>
            <prop key="auto.commit.interval.ms">1000</prop>
        </props>
    </property>
</bean>

<int-kafka:consumer-context
    id="consumerContext" consumer-timeout="1000"
    zookeeper-connect="zookeeperConnect"
    consumer-properties="consumerProperties">
    <int-kafka:consumer-configurations>
        <int-kafka:consumer-configuration
            group-id="Group1" max-messages="5000"
            key-decoder="deccoder" value-decoder="deccoder">
            <int-kafka:topic id="Helloworld-Topic" streams="3" />
        </int-kafka:consumer-configuration>
    </int-kafka:consumer-configurations>
</int-kafka:consumer-context>

<bean id="deccoder"
    class="org.springframework.integration.kafka.serializer.common.StringDecoder" />

1 Ответ

0 голосов
/ 07 мая 2019

См. документацию (глава Spring для Apache Kafka) .

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        mode="record"
        retry-template="template"
        recovery-callback="callback"
        error-message-strategy="ems"
        channel="someChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.config.ContainerProperties">
            <constructor-arg name="topics" value="foo" />
        </bean>
    </constructor-arg>

</bean>
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...