Получение исключений во время теста КафкаТест, хотя тест пройден - PullRequest
0 голосов
/ 04 октября 2019

Я запускаю тест Junit с использованием EmbeddedKafka, я могу производить и использовать для брокеров встроенного Kafka и успешно подтверждать отправленные мной данные.

Но в стеке я вижу много исключенийtrace, которая приходит после того, как утверждение выполнено.

1)

java.io.IOException: подключение к 0 было отключено до того, как ответ был прочитан в org.apache.kafka.clients.NetworkClientUtils.sendAndReceive (NetworkClientUtils.java:97) ~ [kafka-clients-2.0.1.jar: na] в kafka.controller.RequestSendThread.doWork (ControllerChannelManager.scala: 240) ~ [kafka_2.11-2.0.1.jar:na] at kafka.utils.ShutdownableThread.run (ShutdownableThread.scala: 82) [kafka_2.11-2.0.1.jar: na]

2)

2019-10-04 15: 49: 27.123 ПРЕДУПРЕЖДЕНИЕ 1812 --- [r-0-send-thread] kafka.controller.RequestSendThread: [RequestSendThread controllerId = 0] Соединение контроллера 0 с локальным хостом брокера: 54745 (id: 0 rack: null) было неудачным

java.net.SocketTimeoutException: не удалось установитьв течение 1000 мс на kafka.controller.RequestSendThread.brokerReady (ControllerChannelManager.scala: 280) [kafka_2.11-2.0.1.jar: na] на kafka.controller.RequestSendThread.doWork (ControllerChannelManager.scala 23). 23:11-2.0.1.jar: na] at kafka.utils.ShutdownableThread.run (ShutdownableThread.scala: 82) [kafka_2.11-2.0.1.jar: na]

3)

java.io.IOException: Соединение с локальным хостом: 54745 (id: 0 rack: null) не удалось. в org.apache.kafka.clients.NetworkClientUtils.awaitReady (NetworkClientUtils.java:70) ~ [kafka-clients-2.0.1.jar: na] в kafka.controller.RequestSendThread.brokerReady (ControllerChannelManager.scala_ 27: 27).11-2.0.1.jar: na] в kafka.controller.RequestSendThread.doWork (ControllerChannelManager.scala: 233) [kafka_2.11-2.0.1.jar: na] в kafka.utils.ShutdownableThread.run (ShutdownableThread. scala: 82) [kafka_2.11-2.0.1.jar: na]

4)

java.lang.InterruptedException: null at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(Неизвестный источник) ~ [na: 1.8.0_181] в java.util.concurrent.CountDownLatch.await (Неизвестный источник) ~ [na: 1.8.0_181] в kafka.utils.ShutdownableThread.pause (ShutdownableThread.scala: 69) [kafka_2.11-2.0.1.jar: na] в kafka.controller.RequestSendThread.backoff $ 1 (ControllerChannelManager.scala: 221) ~ [kafka_2.11-2.0.1.jar: na] в kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala: 235) ~ [kafka_2.11-2.0.1.jar: na] at kafka.utils.ShutdownableThread.run (ShutdownableThread.scala: 82) [kafka_2.11-2.0.1.jar: na]

Мой тест:

 import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.header.Header;
    import org.apache.kafka.common.header.Headers;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
    import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
    import org.springframework.kafka.support.serializer.JsonDeserializer;
    import org.springframework.kafka.test.context.EmbeddedKafka;
    import org.springframework.kafka.test.rule.EmbeddedKafkaRule;

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT, classes = TestApplication.class)
    @ActiveProfiles("test")
    @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_CLASS)
    //@EmbeddedKafka(controlledShutdown = true)
    public class KafkaUtilTest {

        private static String TOPIC = "PE";

        @Autowired
        KafkaUtil kafkaUtil;

        @Autowired
        ConfigurationProperties configProperties;

        SchedulingCallRequestDTO request;
        ScheduleOrderResponseDTOv2 response;
        Consumer<String, Object> consumer;
        HashMap<String, String> expectedHeaderValueMap;

        @ClassRule
        public static EmbeddedKafkaRule embeddedKafkarule = new EmbeddedKafkaRule(1, true, TOPIC);

        @BeforeClass
        public static void setUpBeforeClass() throws Exception {
            System.setProperty("spring.kafka.producer.bootstrap-servers",
                    embeddedKafkarule.getEmbeddedKafka().getBrokersAsString());
        }

        @AfterClass
        public static void tearDown() {
            embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.shutdown());
            embeddedKafkarule.getEmbeddedKafka().getKafkaServers().forEach(b -> b.awaitShutdown());
        }

        @Before
        public void init() {
            readFile("0211");
            expectedHeaderValueMap = getExpectedHeaderValueMap();
            Map<String, Object> consumerConfigs = new HashMap<>(
                    KafkaTestUtils.consumerProps("consumer", "true", embeddedKafkarule.getEmbeddedKafka()));
            consumerConfigs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerConfigs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
            consumerConfigs.put(JsonDeserializer.TRUSTED_PACKAGES, "com.adessa.promiseengine.dto.kafka");

            consumer = new DefaultKafkaConsumerFactory<String, Object>(consumerConfigs).createConsumer();

            List<String> topics = new ArrayList<>();
            topics.add(TOPIC);

            TopicPartition topicPartition1 = new TopicPartition(TOPIC, 0);
            TopicPartition topicPartition2 = new TopicPartition(TOPIC, 1);
            List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
            topicPartitions.add(topicPartition1);
            topicPartitions.add(topicPartition2);
            consumer.assign(topicPartitions);
            consumer.seekToBeginning(topicPartitions);

        }

        @Test
        public void testPublish() throws Exception {
            kafkaUtil.publishToKafka(request, response);
            kafkaUtil.kafkaTemplate.flush();
            ConsumerRecord<String, Object> consumedRecord = KafkaTestUtils.getSingleRecord(consumer, TOPIC);
            assertRecord(consumedRecord);
        }
        private void readFile(String testSequenceNo) {}

    }

Почему существуют эти исключения? И во время какого процесса они происходят? Пожалуйста, помогите

1 Ответ

1 голос
/ 10 октября 2019

Вы используете правило класса, и брокер срывается после завершения теста, но вы оставляете клиента открытым;вам нужно закрыть потребителя в конце теста.

...