java .lang.ClassCastException: class. $ Proxy143 не может быть приведен к классу .MessageChannel (... находятся в неназванном модуле загрузчика 'app') - PullRequest
0 голосов
/ 30 января 2020

Я пишу тесты для приложения Spring Cloud Stream. Это чтение KStream из темы A. В тесте я использую KafkaTemplate для публикации sh сообщений и ожидаю появления журналов KStream.

Тесты выдают следующее исключение:

java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
    at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.1.RELEASE.jar:3.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:115) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]

Это исключение не ' t появляется в обычном исполнении приложения.

KSTREAM:

@Configuration
class MyKStream() {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun processSomething(): Consumer<KStream<XX, XX>> {
        return Consumer { something ->
            something.foreach { key, value ->
            logger.info("--------> Processing xxx key {} - value {}", key, value)
        }
    }

ТЕСТ:

@TestInstance(PER_CLASS)
@EmbeddedKafka
@SpringBootTest(properties = [
    "spring.profiles.active=local",
    "schema-registry.user=",
    "schema-registry.password=",
    "spring.cloud.stream.bindings.processSomething-in-0.destination=topicA",
    "spring.cloud.stream.bindings.processSomething-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.processSomethingElse-in-0.destination=topicB",
    "spring.cloud.stream.bindings.processSomethingElse-in-0.producer.useNativeEncoding=true",
    "spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
    "spring.cloud.stream.function.definition=processSomething;processSomethingElse"])
class MyKStreamTests {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Autowired
    private lateinit var schemaRegistryMock: SchemaRegistryMock

    @AfterAll
    fun afterAll() {
        embeddedKafka.kafkaServers.forEach { it.shutdown() }
        embeddedKafka.kafkaServers.forEach { it.awaitShutdown() }
    }

    @Test
    fun `should send and process something`() {

        val producer = createProducer()
        logger.debug("**********----> presend")
        val msg = MessageBuilder.withPayload(xxx)
                .setHeader(KafkaHeaders.MESSAGE_KEY, xxx)
                .setHeader(KafkaHeaders.TIMESTAMP, 1L)
                .build()
        producer.send(msg).get()
        logger.debug("**********----> sent")

        Thread.sleep(100000)
    }
}

@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {

    private val schemaRegistryMock = SchemaRegistryMock()

    @PostConstruct
    fun init() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
        schemaRegistryMock.start()
        System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
    }

    @Bean
    fun schemaRegistryMock(): SchemaRegistryMock {
        return schemaRegistryMock
    }

    @PreDestroy
    fun preDestroy() {
        schemaRegistryMock.stop()
    }
}
...