EmbeddedKafka AdminClient завершает работу до запуска приложения Spring для тестирования - PullRequest
0 голосов
/ 05 ноября 2018

Я пытаюсь написать интеграционные тесты для приложения Spring Kafka (Spring Boot 2.0.6, Spring Kafka 2.1.10) и вижу множество экземпляров INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x166e432ebec0001 type:create cxid:0x5e zxid:0x24 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-topic/partitions и различные варианты пути (/brokers, /brokers/topics и т. Д.), Которые отображаются в журналах до запуска приложения Spring. AdminClient затем закрывается, и это сообщение регистрируется:

DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:575)
at kafka.network.Processor.run(SocketServer.scala:492)
at java.lang.Thread.run(Thread.java:748)

Я использую опцию запуска @ClassRule в тесте так:

@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 'my-topic')

, автоматическое подключение KafkaTemplate и настройка свойств Spring для соединения на основе встроенных значений Kafka:

def setupSpec() {
    System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
    System.setProperty('spring.cloud.stream.kafka.binder.zkNodes', embeddedKafka.getZookeeperConnectionString());
}

После запуска приложения Spring я снова вижу экземпляр сообщений KeeperException на уровне пользователя: o.a.z.server.PrepRequestProcessor : Got user-level KeeperException when processing sessionid:0x166e445836d0001 type:setData cxid:0x6b zxid:0x2b txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets.

Есть идеи, где я иду не так? Я могу предоставить другую информацию о настройке и записать в журнал сообщения, но просто сделал обоснованное предположение о том, что поначалу может быть наиболее полезным.

Ответы [ 2 ]

0 голосов
/ 06 ноября 2018

@ Ответ Артема Билана поставил меня на правильный путь, поэтому спасибо ему за участие, и я смог выяснить это после просмотра других BlockingVariable статей и примеров. Я использовал BlockingVariable в ответе макета, а не в качестве обратного вызова. Когда ответ макета вызывается, установите для него значение true, и блок then просто сделает result.get(), и тест пройден.

@DirtiesContext
@ActiveProfiles('test')
@SpringBootTest
@Import(MockConfig.class)
class CustomListenerSpec extends TestSpecBase {

    @ClassRule
    @Shared
    private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TOPIC_NAME)

    @Autowired
    private KafkaTemplate<String, String> template

    @Autowired
    private SimpleService service

    final def TOPIC_NAME = 'my-topic'

    def setupSpec() {
        System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
    }

    def 'Sample test'() {
        def testMessagePayload = "Test message"
        def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
        def result = new BlockingVariable<Boolean>(5)
        service.handleMessage(_ as String) >> {
            result.set(true)
        }

        when: 'We put a message on the topic'
        template.send(message)

        then: 'the service should be called'
        result.get()
    }
}
0 голосов
/ 05 ноября 2018

Я не знаком со Споком, но я знаю, что метод @KafkaListener вызывается в его собственном потоке, поэтому вы не можете просто установить его непосредственно в блоке then:.

Вам нужно как-то обеспечить блокировку ожидания в вашем тестовом примере.

Я пытался с BlockingVariable против реальной службы не издеваться, и я вижу в логах ваш println(message). Но это BlockingVariable все еще не работает для меня как-то:

@DirtiesContext
@SpringBootTest(classes = [KafkaIntTestApplication.class])
@ActiveProfiles('test')
class CustomListenerSpec  extends Specification {

    @ClassRule
    @Shared
    public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, 'my-topic')

    @Autowired
    private KafkaTemplate<String, String> template

    @SpyBean
    private SimpleService service

    final def TOPIC_NAME = 'my-topic'

    def setupSpec() {
        System.setProperty('spring.kafka.bootstrapServers', embeddedKafka.getBrokersAsString());
    }

    def 'Sample test'() {
        given:
        def testMessagePayload = "Test message"
        def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
        def result = new BlockingVariable<Boolean>(5)
        service.handleMessage(_) >> {
            result.set(true)
        }

        when: 'We put a message on the topic'
        template.send(message)

        then: 'the service should be called'
        result.get()
    }
}

И журналы такие:

2018-11-05 13:38:51.089  INFO 8888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [my-topic-0, my-topic-1]
Test message

BlockingVariable.get() timed out after 5,00 seconds

    at spock.util.concurrent.BlockingVariable.get(BlockingVariable.java:113)
    at com.example.CustomListenerSpec.Sample test(CustomListenerSpec.groovy:54)

2018-11-05 13:38:55.917  INFO 8888 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@11ebb1b6: startup date [Mon Nov 05 13:38:49 EST 2018]; root of context hierarchy

Также мне пришлось добавить эту зависимость:

testImplementation "org.hamcrest:hamcrest-core"

UPDATE

OK. Реальная проблема в том, что MockConfig не было видно для конфигурации тестового контекста, и что @Import(MockConfig.class) делает свое дело. Где @Primary также дает нам дополнительный сигнал о том, какой боб собирать для инъекции в тестовом классе.

...