Микро-сервис Spring-Boot не может отправлять сообщения в тему после перезапуска kafka.
Используя конфигурацию Docker Swarm, я настроил кластер с одним узлом с 1 брокером Kafka и 2 Micro с пружинной загрузкой-услуги (производитель и потребитель).Я использую весеннюю загрузку 2.0.3
Потребитель и производитель (микро-сервисы весенней загрузки) находятся в одной и той же оверлейной сети "net-broker" и поэтому получают доступ к kafka с помощьюимя службы "kafka: 9092"
Все работает нормально при первом запуске.
Затем перезапускается ТОЛЬКО kafka и после этого потребительЯ больше не могу отправлять сообщения из темы kafka.
Служба kafka перезапускается из-за небольшого изменения в docker-compose.yml (например, max_attempts: 3 => max_attempts:4)
Файл docker-compose.yml
kafka:
image: wurstmeister/kafka:2.12-2.2.0
depends_on:
- zookeeper
networks:
- net-broker
deploy:
replicas: 1
update_config:
parallelism: 1
delay: 10s
restart_policy:
condition: on-failure
max_attempts: 3
# ports:
# - target: 9094
# published: 9094
# protocol: tcp
# mode: host
environment:
HOSTNAME_COMMAND: "echo ${HOST_IP:-192.168.99.100}"
KAFKA_CREATE_TOPICS: "gnss-topic-${GNSS_TAG}:3:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
BROKER_ID_COMMAND: "echo 101"
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka-data:/kafka
Класс KafkaProducerConfig
@Bean
public ProducerFactory<String, GNSSPortHolderDTO> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, gnssConfig.getKafkaBootstapServers());
// high throughput producer (at the expense of a bit of latency and CPU usage)
configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configProps.put(ProducerConfig.LINGER_MS_CONFIG, "20");
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.toString(32*1024)); // 32 KB batch size
// serializers
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
производитель загрузочной пружины регистрирует после перезагрузки kafka:
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for gnss-topic-11.2.1-B5607-1: 30030 ms has passed since batch creation plus linger time
журналы потребителя весенней загрузки после перезапуска kafka:
gnss_data-access.1.ll948jogpqil@GDN-S-GNSS2 | 2019-05-08 09:42:33.984 INFO 1 --- [ gnss-view-data] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=gnss-view-data] Marking the coordinator fe7091944126:9092 (id: 2147483546 rack: null) dead
Я использую библиотеку 'spring-kafka-2.1.7.RELEASE.jar' для производителя / потребителя micro-services
Используя режим удаленной отладки, я понял, что потребитель пытаетсяотправить сообщение старому «убитому» идентификатору контейнера вместо использования имени службы «kafka: 9092».Я не знаю почему.