Как протестировать потребителя кафки против реального брокера кафки, работающего на сервере? - PullRequest
0 голосов
/ 08 мая 2019

Мне трудно понять некоторые концепции Kafka в Java Spring Boot.Я хотел бы протестировать потребителя с реальным брокером Kafka, работающим на сервере, у которого есть несколько производителей, которые пишут / уже писали данные в различные темы.Я хотел бы установить соединение с сервером, использовать данные и проверять или обрабатывать их содержимое в тесте.

Подавляющее большинство примеров (на самом деле все, что я видел до сих пор) в Интернете относятся квстроенный kafka, EmbeddedKafkaBroker, и показать как производителя, так и потребителя, реализованных на одной машине, локально.Я не нашел ни одного примера, который объяснил бы, как установить соединение с удаленным сервером kafka и прочитать данные из определенной темы.Я написал некоторый код и напечатал адрес посредника с помощью:

System.out.println(embeddedKafkaBroker.getBrokerAddress(0));

Получил 127.0.0.1:9092, что означает, что он локальный, поэтому соединение с удаленным сервером имеетне установлено.

С другой стороны, когда я запускаю SpringBootApplication, я получаю полезную нагрузку от удаленного брокера.

Получатель:

@Component
public class Receiver {

private static final String TOPIC_NAME = "X";

private static final Logger LOGGER = LoggerFactory.getLogger(Receiver.class);

private CountDownLatch latch = new CountDownLatch(1);

public CountDownLatch getLatch() {
    return latch;
}

@KafkaListener(topics = TOPIC_NAME)
public void receive(final byte[] payload) {
    LOGGER.info("received the following payload: '{}'", payload);
    latch.countDown();
}
}

Конфиг:

    @EnableKafka
    @Configuration
    public class ByteReceiverConfig {

        @Autowired
        EmbeddedKafkaBroker kafkaEmbeded;

        @Value("${spring.kafka.bootstrap-servers}")
        private String bootstrapServers;

        @Value("${spring.kafka.consumer.group-id}")
        private String groupIdConfig;

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            final ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }

        @Bean
        ConsumerFactory<Object, Object> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerProperties());
        }

        @Bean
        Map<String, Object> consumerProperties() {
            final Map<String, Object> properties =
                    KafkaTestUtils.consumerProps("junit-test", "true", this.kafkaEmbeded);
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
            return properties;
        }

Тест:

        @EnableAutoConfiguration
        @EnableKafka
        @SpringBootTest(classes = {ByteReceiverConfig.class, Receiver.class})
        @EmbeddedKafka
        @ContextConfiguration(classes = ByteReceiverConfig.class)
        @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
                "spring.kafka.consumer.group-id=EmbeddedKafkaTest"})
        public class KafkaTest {


            @Autowired
            private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

            @Autowired
            EmbeddedKafkaBroker embeddedKafkaBroker;


            @Autowired
            Receiver receiver;

            @BeforeEach
            void waitForAssignment() {
                for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
                    System.out.println(messageListenerContainer.getAssignedPartitions().isEmpty());
                    System.out.println(messageListenerContainer.toString());
                    System.out.println(embeddedKafkaBroker.getTopics().size());
                    System.out.println(embeddedKafkaBroker.getPartitionsPerTopic());
                    System.out.println(embeddedKafkaBroker.getBrokerAddress(0));
                    System.out.println(embeddedKafkaBroker.getBrokersAsString());

                    ContainerTestUtils.waitForAssignment(messageListenerContainer,
                            embeddedKafkaBroker.getPartitionsPerTopic());
            }

            @Test
            public void testReceive() {

            }
        }

Мне бы хотелось, чтобы кто-то пролил некоторый свет на следующие проблемы:

1. Можно ли использовать экземпляр класса EmbeddedKafkaBroker для тестированияданные, которые поступают от удаленного брокера, или они используются только для локальных тестов, в которых я бы прокументировал, то есть отправил данные в созданную мной тему и сам использовал данные?

2. Можно ли написатьтестовый класс для реального кафки сервера?Например, чтобы проверить, было ли установлено соединение, или данные были прочитаны из определенной темы.Какие аннотации, конфигурации и классы потребуются в таком случае?

3.Если я хочу использовать только данные, мне нужно предоставить конфигурацию производителя в файле конфигурации (это было бы странно, но всепримеры, с которыми я сталкивался до сих пор, сделал это)?

4. Знаете ли вы какие-либо ресурсы (книги, веб-сайты и т. д.), которые показывают реальные примеры использования kafka, то есть с удаленным сервером kafka, с прокудером или потребителем?только

1 Ответ

0 голосов
/ 08 мая 2019
  1. Вам вообще не нужен встроенный брокер, если вы хотите общаться только с внешним брокером.

  2. Да, просто соответствующим образом установите свойство серверов начальной загрузки.

  3. Нет, вам не нужна конфигурация производителя.

EDIT

@SpringBootApplication
public class So56044105Application {

    public static void main(String[] args) {
        SpringApplication.run(So56044105Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so56044105", 1, (short) 1);
    }

}
spring.kafka.bootstrap-servers=10.0.0.8:9092
spring.kafka.consumer.enable-auto-commit=false
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { So56044105Application.class, So56044105ApplicationTests.Config.class })
public class So56044105ApplicationTests {

    @Autowired
    public Config config;

    @Test
    public void test() throws InterruptedException {
        assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(config.received.get(0)).isEqualTo("foo");
    }

    @Configuration
    public static class Config implements ConsumerSeekAware {

        List<String> received = new ArrayList<>();

        CountDownLatch latch = new CountDownLatch(3);

        @KafkaListener(id = "so56044105", topics = "so56044105")
        public void listen(String in) {
            System.out.println(in);
            this.received.add(in);
            this.latch.countDown();
        }

        @Override
        public void registerSeekCallback(ConsumerSeekCallback callback) {
        }

        @Override
        public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
            System.out.println("Seeking to beginning");
            assignments.keySet().forEach(tp -> callback.seekToBeginning(tp.topic(), tp.partition()));
        }

        @Override
        public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        }

    }

}
...