Это продолжение нескольких других вопросов SO, касающихся использования подпружиненной и подпружиненной кафки.
Намерение:
Цель состоит в том, чтобы настроить цепочку вызовов следующим образом (упрощенное представление):
Мастер вызывает шаг раба:
<master job> -> partitioner (MessageChannelPartitionHandler) +aggregator -> messagingTemplate -> outbound-requests (Channel) -> request-outbound-staging (KafkaProducerMessageHandler) -> kafka
Слушатель Кафки отвечает на сообщение и запускает шаг подчиненного работника
kafka -> inbound-request-listener (MessageDrivenChannelAdapter) -> inbound-requests (channel) -> worker-container (KafkaMessageListenerContainer) -> stepExecutionRequestHandler <slave step>
Весенние партии возвращаются в Kafka
stepExecutionRequestHandler <slave step> -> stepMessagingTemplate -> outbound-replies (Channel) -> reply-outbound-staging (KafkaProducerMessageHandler) -> kafka
Слушатель Kafka возвращает ответы агрегатору и секционеру
kafka -> inbound-replies (MessageDrivenChannelAdapter) -> partitioner (MessageChannelPartitionHandler) +aggregator -> <master job>
История
После первоначальной разработки конфигурации интегрирования пружин с kafka компоненты пакетной пружины на ведомой стороне процесса не находили шаги, когда слушатель был запущен.
Мы произвели рефакторинг компонентов Spring-Batch и Spring-Integration, которые управляют ими, в конечном итоге переместив их и компоненты-слушатели из Java DSL в шаг slave XML.
Текущий статус:
После рефакторинга слушатель кафки, похоже, больше не отвечает. Единственный признак - отсутствие ответа от подчиненного процесса и таймер агрегатора.
Java DSL config:
@Configuration
@Order(6)
@EnableIntegration
@EnableKafka
@IntegrationComponentScan
public class QueueingConfig {
private static final int MXMODULE = 400;
private static final String JOB_CONTROL_TOPIC = "job.control";
private static final String STEP_EXECUTION_TOPICS = "job.step";
private static final String STEP_REPLY_TOPICS = "job.step.reply";
Фрагменты XML-конфигурации:
<bean id="worker-container" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.2.127:9092" /> <!-- needs to come from factory bean -->
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
<entry key="value.deserializer" value="org.springframework.kafka.support.serializer.JsonDeserializer"/>
<entry key="group.id" value="batch"/>
<entry key="spring.json.trusted.packages" value="com.mypackage,org.springframework.batch.integration.partition"/>
<entry key="max.poll.records" value="10"/>
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="job.step" />
</bean>
</constructor-arg>
</bean>
<int-kafka:message-driven-channel-adapter
id="inboundKafkaRequests"
send-timeout="5000"
mode="record"
channel="inbound-requests"
auto-startup="true"
listener-container="worker-container"
/>
Предыдущие исследования:
Spring Integration Kafka Consumer Listener не получает сообщения
Как преобразовать эту конфигурацию Spring-интеграции из XML в Java?
брокер kafka недоступен при запуске
Редактировать: Обновить
Во время обновления патча linux файл конфигурации kafka был перезаписан файлом по умолчанию. Я восстановил правильную конфигурацию и кафка возобновила работу.
В процессе написания этого вопроса я формализовал большую часть пружинной проводки. Процесс формализации помог выявить некоторые проблемы с несвязанными компонентами интеграции.