Мне трудно понять некоторые концепции Kafka в Java Spring Boot.Я хотел бы протестировать потребителя с реальным брокером Kafka, работающим на сервере, у которого есть несколько производителей, которые пишут / уже писали данные в различные темы.Я хотел бы установить соединение с сервером, использовать данные и проверять или обрабатывать их содержимое в тесте.
Подавляющее большинство примеров (на самом деле все, что я видел до сих пор) в Интернете относятся квстроенный kafka, EmbeddedKafkaBroker, и показать как производителя, так и потребителя, реализованных на одной машине, локально.Я не нашел ни одного примера, который объяснил бы, как установить соединение с удаленным сервером kafka и прочитать данные из определенной темы.Я написал некоторый код и напечатал адрес посредника с помощью:
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
Получил 127.0.0.1:9092, что означает, что он локальный, поэтому соединение с удаленным сервером имеетне установлено.
С другой стороны, когда я запускаю SpringBootApplication, я получаю полезную нагрузку от удаленного брокера.
Получатель:
@Component
public class Receiver {
private static final String TOPIC_NAME = "X";
private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);
private CountDownLatch latch = new CountDownLatch(1);
public CountDownLatch getLatch() {
return latch;
}
@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
LOGGER.info("received the following payload: '{}'", payload);
latch.countDown();
}
}
Конфиг:
@EnableKafka
@Configuration
public class ByteReceiverConfig {
@Autowired
EmbeddedKafkaBroker kafkaEmbeded;
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupIdConfig;
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
ConsumerFactory<Object, Object> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProperties());
}
@Bean
Map<String, Object> consumerProperties() {
final Map<String, Object> properties =
KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
return properties;
}
Тест:
@EnableAutoConfiguration
@EnableKafka
@SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
@EmbeddedKafka
@ContextConfiguration(classes = ByteReceiverConfig.class)
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
public class KafkaTest {
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
Receiver receiver;
@BeforeEach
void waitForAssignment() {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
System.out.println(messageListenerContainer.toString());
System.out.println(embeddedKafkaBroker.getTopics().size());
System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
System.out.println(embeddedKafkaBroker.getBrokersAsString());
ContainerTestUtils.waitForAssignment(messageListenerContainer,
embeddedKafkaBroker.getPartitionsPerTopic());
}
@Test
public void testReceive() {
}
}
Мне бы хотелось, чтобы кто-то пролил некоторый свет на следующие проблемы:
1. Можно ли использовать экземпляр класса EmbeddedKafkaBroker для тестированияданные, которые поступают от удаленного брокера, или они используются только для локальных тестов, в которых я бы прокументировал, то есть отправил данные в созданную мной тему и сам использовал данные?
2. Можно ли написатьтестовый класс для реального кафки сервера?Например, чтобы проверить, было ли установлено соединение, или данные были прочитаны из определенной темы.Какие аннотации, конфигурации и классы потребуются в таком случае?
3.Если я хочу использовать только данные, мне нужно предоставить конфигурацию производителя в файле конфигурации (это было бы странно, но всепримеры, с которыми я сталкивался до сих пор, сделал это)?
4. Знаете ли вы какие-либо ресурсы (книги, веб-сайты и т. д.), которые показывают реальные примеры использования kafka, то есть с удаленным сервером kafka, с прокудером или потребителем?только