Как распознать сообщение кафки, прочитанное потребителем, используя весеннюю интеграцию кафки - PullRequest
0 голосов
/ 19 мая 2019

Мы используем spring -gration-kafka версии 3.1.2.RELEASE и int-kafka: message-driven-channel-adapter для приема сообщений из темы удаленной kafka. Производитель отправляет зашифрованное сообщение, а мы расшифровываем реальное сообщение с помощью десериализатора. Мы можем использовать все сообщения, размещенные в теме. Мы использовали автоматическую фиксацию как ложную. Мы хотели бы знать, как принять или подтвердить сообщение от нашего сервиса после успешной обработки сообщения. Может ли кто-нибудь помочь нам, как зафиксировать сообщения, прочитанные из канала, управляемого сообщениями, и предоставить некоторую справочную реализацию?

Когда мы устанавливаем значение auto commit на true, мы предполагаем, что оно передаст сообщение после интервала коммита, но мы хотели бы обработать его в нашем сервисе. Я сталкивался с приведенным ниже примером, но после десериализации мы получаем пользовательский объект, а не пружинное сообщение об интеграции. поэтому мы хотели бы знать, как реализовать подобное подтверждение в преобразователе, чтобы мы не фиксировали сообщение в случае каких-либо ошибок во время преобразования. Зафиксируйте сообщение после успешного преобразования.

  Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 
  if(acknowledgment != null) { System.out.println("Acknowledgment provided");
  acknowledgment.acknowledge(); }
   }



<int-kafka:message-driven-channel-adapter
    id="kafkaMessageListener"
    listener-container="kafkaMessageContainer" auto-startup="true"
    phase="100" send-timeout="5000" mode="record"
    message-converter="messageConverter"
    recovery-callback="recoveryCallback" error-message-strategy="ems"
    channel="inputFromKafkaChannel" error-channel="errorChannel" />

<int:transformer id="transformerid"
    ref="transformerBean"
    input-channel="inputFromKafkaChannel" method="transform"
    output-channel="messageTransformer" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="${spring.kafka.bootstrap-servers}" />
                    <entry key="enable.auto.commit" value="false" />
                    <entry key="auto.commit.interval.ms" value="100" />
                    <entry key="session.timeout.ms" value="15000" />
                    <entry key="group.id" value="${spring.kafka.consumer.group-id}" />
                    <entry key="key.deserializer"
                        value="org.apache.kafka.common.serialization.StringDeserializer" />
                    <entry key="value.deserializer"
                        value="com.test.CustomDeserializer" />
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="${spring.kafka.topics}" />
        </bean>
    </constructor-arg>
</bean>

1 Ответ

0 голосов
/ 19 мая 2019

auto.commit.offset=true означает, что библиотека kafka-clients фиксирует смещения.

Если установлено значение false (предпочтительно для Spring для Apache Kafka), контейнер слушателя фиксирует смещения после каждого пакета, полученного poll()по умолчанию, но механизм управляется свойством AckMode контейнера.

См. Подтверждение фиксации .

Если для контейнера AckMode установлено MANUAL илиMANUAL_IMMEDIATE тогда ваше приложение должно выполнить коммиты, используя объект Acknowledgment.

При использовании Spring Integration объект Acknowledgment доступен в заголовке KafkaHeaders.ACKNOWLEDGMENT.

ВВ большинстве случаев следует использовать AckMode.BATCH (по умолчанию) или AckMode.RECORD, и ваше приложение не должно беспокоиться о фиксации смещений.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...