Весна интеграции кафка - слушатель не отвечает - как я могу понять, почему? - PullRequest
0 голосов
/ 30 апреля 2019

Это продолжение нескольких других вопросов SO, касающихся использования подпружиненной и подпружиненной кафки.

Намерение:

Цель состоит в том, чтобы настроить цепочку вызовов следующим образом (упрощенное представление):

Мастер вызывает шаг раба:

<master job> -> partitioner (MessageChannelPartitionHandler) +aggregator -> messagingTemplate -> outbound-requests (Channel) -> request-outbound-staging (KafkaProducerMessageHandler) -> kafka

Слушатель Кафки отвечает на сообщение и запускает шаг подчиненного работника

kafka -> inbound-request-listener (MessageDrivenChannelAdapter) -> inbound-requests (channel) -> worker-container (KafkaMessageListenerContainer) -> stepExecutionRequestHandler <slave step>

Весенние партии возвращаются в Kafka

stepExecutionRequestHandler <slave step> -> stepMessagingTemplate -> outbound-replies (Channel) -> reply-outbound-staging (KafkaProducerMessageHandler) -> kafka

Слушатель Kafka возвращает ответы агрегатору и секционеру

kafka -> inbound-replies (MessageDrivenChannelAdapter) -> partitioner (MessageChannelPartitionHandler) +aggregator -> <master job>

История

После первоначальной разработки конфигурации интегрирования пружин с kafka компоненты пакетной пружины на ведомой стороне процесса не находили шаги, когда слушатель был запущен.

Мы произвели рефакторинг компонентов Spring-Batch и Spring-Integration, которые управляют ими, в конечном итоге переместив их и компоненты-слушатели из Java DSL в шаг slave XML.

Текущий статус:

После рефакторинга слушатель кафки, похоже, больше не отвечает. Единственный признак - отсутствие ответа от подчиненного процесса и таймер агрегатора.

Java DSL config:

@Configuration
@Order(6)
@EnableIntegration
@EnableKafka
@IntegrationComponentScan
public class QueueingConfig {
    private static final int MXMODULE = 400;
    private static final String JOB_CONTROL_TOPIC = "job.control";
    private static final String STEP_EXECUTION_TOPICS = "job.step";
    private static final String STEP_REPLY_TOPICS = "job.step.reply";

Фрагменты XML-конфигурации:

<bean id="worker-container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="192.168.2.127:9092" />  <!-- needs to come from factory bean -->
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
                <entry key="group.id" value="batch"/>
                <entry key="spring.json.trusted.packages" value="com.mypackage,org.springframework.batch.integration.partition"/>
                <entry key="max.poll.records" value="10"/>
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg>
        <bean class="org.springframework.kafka.listener.ContainerProperties">
            <constructor-arg name="topics" value="job.step" />
        </bean>
    </constructor-arg>
</bean>


<int-kafka:message-driven-channel-adapter
    id="inboundKafkaRequests"
    send-timeout="5000"
    mode="record"
    channel="inbound-requests"
    auto-startup="true"
    listener-container="worker-container" 
    />

Предыдущие исследования:

  1. Spring Integration Kafka Consumer Listener не получает сообщения

  2. Как преобразовать эту конфигурацию Spring-интеграции из XML в Java?

  3. брокер kafka недоступен при запуске


Редактировать: Обновить

Во время обновления патча linux файл конфигурации kafka был перезаписан файлом по умолчанию. Я восстановил правильную конфигурацию и кафка возобновила работу.

В процессе написания этого вопроса я формализовал большую часть пружинной проводки. Процесс формализации помог выявить некоторые проблемы с несвязанными компонентами интеграции.

...