Пример конфигурации для соединений по внешнему ключу с использованием Spring-Kafka и Confluent Registry - PullRequest
0 голосов
/ 25 февраля 2020

Мне уже удалось сделать несколько соединений Avro KStream с Spring Cloud Stream, и теперь я пытаюсь использовать новое соединение с внешним ключом между двумя Ktables с, как мне показалось, более простым Spring-Kafka

Вот конфигурация:

spring.kafka.streams.bootstrap-servers=XXX.XXX.XXX.XXX:9092
spring.kafka.streams.application-id=hubbit
spring.kafka.properties.schema.registry.url=http://XXX.XXX.XXX.XXX:8081
spring.kafka.streams.properties.default.key.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.auto-offset-reset=latest
ficexp.tools.topic.name=ficexp1.ficexp.tools
ficexp.tools_types.topic.name=ficexp1.ficexp.tools_types
ficexp.enriched_tools.topic.name=ficexp1.ficexp.enriched_tools

и мой основной класс.

@SpringBootApplication
@EnableKafka
@EnableKafkaStreams
public class StreamsApplication {
    public static void main(String[] args) {
        SpringApplication.run(StreamsApplication.class, args);
    }
    @Value("${ficexp.tools.topic.name}")
    private String tools ;
    @Value("${ficexp.tools_types.topic.name}")
    private String tools_types ;
    @Value("${ficexp.enriched_tools.topic.name}")
    private String enriched_tools ;
    Function<ficexp1.ficexp.tools.Envelope,ficexp1.ficexp.tools_types.Key> foreignKeyExtractor =
            (ficexp1.ficexp.tools.Envelope x) -> {
                return ficexp1.ficexp.tools_types.Key.newBuilder().setId(x.getAfter().getToolsTypeId()).build();
            };
    public class EnrichedToolJoiner implements ValueJoiner<ficexp1.ficexp.tools_types.Envelope, ficexp1.ficexp.tools.Envelope, ficexp1.ficexp.tools.enriched>
    {
        @Override
        public ficexp1.ficexp.tools.enriched apply(ficexp1.ficexp.tools_types.Envelope tools_types,ficexp1.ficexp.tools.Envelope tools)
        {
            return ficexp1.ficexp.tools.enriched.newBuilder().
                    setId(tools.getAfter().getId())
                    .setLib(tools.getAfter().getLib())
                    .setValid(tools.getAfter().getValid())
                    .setToolsTypeId(tools.getAfter().getToolsTypeId())
                    .setTypelib(tools_types.getAfter().getLib())
                    .build();
        }
    }
    final EnrichedToolJoiner enrichedToolJoiner = new EnrichedToolJoiner();
    @Bean
    public KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enriched_toolsTable(StreamsBuilder kStreamBuilder) {
        KTable<ficexp1.ficexp.tools_types.Key, ficexp1.ficexp.tools_types.Envelope> tools_typesTable = kStreamBuilder.table(tools_types);
        KTable<ficexp1.ficexp.tools.Key,ficexp1.ficexp.tools.Envelope > toolsTable = kStreamBuilder.table(tools);
        KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enrichedToolTable = toolsTable.leftJoin(
                tools_typesTable,
                foreignKeyExtractor,
                (ValueJoiner) enrichedToolJoiner
        );
        return enrichedToolTable;
    }
}

(я знаю, что он не закончен), но он не работает. Группа потребителей зарегистрирована в брокере Kafka, и это, кажется, единственное, что работает.

Ошибка, похоже, связана с Serde (org. apache .kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde )

Я совсем потерян, какая-нибудь зацепка?

2020-02-25 13:45:50.181 ERROR 15496 --- [           main] o.s.boot.SpringApplication               : Application run failed

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'enriched_toolsTable' defined in ubx.hub.streams.StreamsApplication: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.apache.kafka.streams.kstream.KTable]: Factory method 'enriched_toolsTable' threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:656) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
    at ubx.hub.streams.StreamsApplication.main(StreamsApplication.java:44) [classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.apache.kafka.streams.kstream.KTable]: Factory method 'enriched_toolsTable' threw exception; nested exception is java.lang.NullPointerException
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    ... 19 common frames omitted
Caused by: java.lang.NullPointerException: null
    at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde.<init>(SubscriptionWrapperSerde.java:31) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.kstream.internals.KTableImpl.doJoinOnForeignKey(KTableImpl.java:956) ~[kafka-streams-2.4.0.jar:na]
    at org.apache.kafka.streams.kstream.internals.KTableImpl.leftJoin(KTableImpl.java:891) ~[kafka-streams-2.4.0.jar:na]
    at ubx.hub.streams.StreamsApplication.enriched_toolsTable(StreamsApplication.java:134) [classes/:na]
    at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961.CGLIB$enriched_toolsTable$0(<generated>) ~[classes/:na]
    at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961$$FastClassBySpringCGLIB$$8469ed4.invoke(<generated>) ~[classes/:na]
    at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.2.4.BUILD-20200225.100750-121.jar:5.2.4.BUILD-SNAPSHOT]
    at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961.enriched_toolsTable(<generated>) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
    ... 20 common frames omitted

РЕДАКТИРОВАТЬ: об ошибке соединения foreignKey, упомянутой Матиасом Дж. Саксом (KAFKA-9517), соединение было изменено на

        SpecificAvroSerde<ficexp1.ficexp.tools.Key> kSerde = new SpecificAvroSerde<ficexp1.ficexp.tools.Key>();
        SpecificAvroSerde<ficexp1.ficexp.tools.enriched> vSerde = new SpecificAvroSerde<ficexp1.ficexp.tools.enriched>();
        Materialized materialized =
                Materialized.as("toto").withValueSerde( (Serde) vSerde)
                        .withKeySerde( (Serde) kSerde );
        KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enrichedToolTable = toolsTable.join(
                tools_typesTable,
                foreignKeyExtractor,
                (ValueJoiner) enrichedToolJoiner,
                ( Materialized ) materialized
                );

и это приводит к той же ошибке

...