У меня есть пример проекта для изучения весны с Кафкой (найти здесь ). У меня есть слушатель, подписывающийся на topi c my-test-topi c -upstream , который просто потеряет сообщение и ключ и опубликует sh то же самое для другого topi c my -test-топи c -downstream . Я попробовал это локальная кафка (docker -композит файл там) и все работает.
Теперь я пытаюсь написать тест для этого, используя встроенный сервер kafka. На тесте у меня запускается встроенный сервер ( TestContext. java), который должен запускаться перед тестом (переопределенный junit beforeAll ).
private static EmbeddedKafkaBroker kafka() {
EmbeddedKafkaBroker kafkaEmbedded =
new EmbeddedKafkaBroker(
3,
false,
1,
"my-test-topic-upstream", "my-test-topic-downstream");
Map<String, String> brokerProperties = new HashMap<>();
brokerProperties.put("default.replication.factor", "1");
brokerProperties.put("offsets.topic.replication.factor", "1");
brokerProperties.put("group.initial.rebalance.delay.ms", "3000");
kafkaEmbedded.brokerProperties(brokerProperties);
try {
kafkaEmbedded.afterPropertiesSet();
} catch (Exception e) {
throw new RuntimeException(e);
}
return kafkaEmbedded;
}
Затем я создаю продюсера ( TickProducer ) и публикую sh сообщение для topi c, которое, как я ожидаю, сможет прослушать мой слушатель.
public TickProducer(String brokers) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
public RecordMetadata publishTick(String brand)
throws ExecutionException, InterruptedException {
return publish(TOPIC, brand, Instant.now().toString());
}
private RecordMetadata publish(String topic, String key, String value)
throws ExecutionException, InterruptedException {
final RecordMetadata recordMetadata;
recordMetadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
producer.flush();
return recordMetadata;
}
Я вижу, что следующее сообщение журнала продолжает регистрироваться.
11:32:35.745 [main] WARN o.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=my-test-group] Connection to node -1 could not be established. Broker may not be available.
окончательно завершается с
11:36:52.774 [main] ERROR o.s.boot.SpringApplication - Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
Любые подсказки здесь?