Мне уже удалось сделать несколько соединений Avro KStream с Spring Cloud Stream, и теперь я пытаюсь использовать новое соединение с внешним ключом между двумя Ktables с, как мне показалось, более простым Spring-Kafka
Вот конфигурация:
spring.kafka.streams.bootstrap-servers=XXX.XXX.XXX.XXX:9092
spring.kafka.streams.application-id=hubbit
spring.kafka.properties.schema.registry.url=http://XXX.XXX.XXX.XXX:8081
spring.kafka.streams.properties.default.key.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.value.serde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.kafka.streams.properties.default.deserialization.exception.handler=org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
spring.kafka.streams.properties.state.dir=/var/lib/kafka-streams
spring.kafka.streams.properties.auto-offset-reset=latest
ficexp.tools.topic.name=ficexp1.ficexp.tools
ficexp.tools_types.topic.name=ficexp1.ficexp.tools_types
ficexp.enriched_tools.topic.name=ficexp1.ficexp.enriched_tools
и мой основной класс.
@SpringBootApplication
@EnableKafka
@EnableKafkaStreams
public class StreamsApplication {
public static void main(String[] args) {
SpringApplication.run(StreamsApplication.class, args);
}
@Value("${ficexp.tools.topic.name}")
private String tools ;
@Value("${ficexp.tools_types.topic.name}")
private String tools_types ;
@Value("${ficexp.enriched_tools.topic.name}")
private String enriched_tools ;
Function<ficexp1.ficexp.tools.Envelope,ficexp1.ficexp.tools_types.Key> foreignKeyExtractor =
(ficexp1.ficexp.tools.Envelope x) -> {
return ficexp1.ficexp.tools_types.Key.newBuilder().setId(x.getAfter().getToolsTypeId()).build();
};
public class EnrichedToolJoiner implements ValueJoiner<ficexp1.ficexp.tools_types.Envelope, ficexp1.ficexp.tools.Envelope, ficexp1.ficexp.tools.enriched>
{
@Override
public ficexp1.ficexp.tools.enriched apply(ficexp1.ficexp.tools_types.Envelope tools_types,ficexp1.ficexp.tools.Envelope tools)
{
return ficexp1.ficexp.tools.enriched.newBuilder().
setId(tools.getAfter().getId())
.setLib(tools.getAfter().getLib())
.setValid(tools.getAfter().getValid())
.setToolsTypeId(tools.getAfter().getToolsTypeId())
.setTypelib(tools_types.getAfter().getLib())
.build();
}
}
final EnrichedToolJoiner enrichedToolJoiner = new EnrichedToolJoiner();
@Bean
public KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enriched_toolsTable(StreamsBuilder kStreamBuilder) {
KTable<ficexp1.ficexp.tools_types.Key, ficexp1.ficexp.tools_types.Envelope> tools_typesTable = kStreamBuilder.table(tools_types);
KTable<ficexp1.ficexp.tools.Key,ficexp1.ficexp.tools.Envelope > toolsTable = kStreamBuilder.table(tools);
KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enrichedToolTable = toolsTable.leftJoin(
tools_typesTable,
foreignKeyExtractor,
(ValueJoiner) enrichedToolJoiner
);
return enrichedToolTable;
}
}
(я знаю, что он не закончен), но он не работает. Группа потребителей зарегистрирована в брокере Kafka, и это, кажется, единственное, что работает.
Ошибка, похоже, связана с Serde (org. apache .kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde )
Я совсем потерян, какая-нибудь зацепка?
2020-02-25 13:45:50.181 ERROR 15496 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'enriched_toolsTable' defined in ubx.hub.streams.StreamsApplication: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.apache.kafka.streams.kstream.KTable]: Factory method 'enriched_toolsTable' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:656) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:636) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1338) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1177) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:557) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:517) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:323) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:321) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:879) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:878) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:550) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) [spring-boot-2.3.0.BUILD-20200225.112934-316.jar:2.3.0.BUILD-SNAPSHOT]
at ubx.hub.streams.StreamsApplication.main(StreamsApplication.java:44) [classes/:na]
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.apache.kafka.streams.kstream.KTable]: Factory method 'enriched_toolsTable' threw exception; nested exception is java.lang.NullPointerException
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
... 19 common frames omitted
Caused by: java.lang.NullPointerException: null
at org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde.<init>(SubscriptionWrapperSerde.java:31) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.kstream.internals.KTableImpl.doJoinOnForeignKey(KTableImpl.java:956) ~[kafka-streams-2.4.0.jar:na]
at org.apache.kafka.streams.kstream.internals.KTableImpl.leftJoin(KTableImpl.java:891) ~[kafka-streams-2.4.0.jar:na]
at ubx.hub.streams.StreamsApplication.enriched_toolsTable(StreamsApplication.java:134) [classes/:na]
at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961.CGLIB$enriched_toolsTable$0(<generated>) ~[classes/:na]
at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961$$FastClassBySpringCGLIB$$8469ed4.invoke(<generated>) ~[classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244) ~[spring-core-5.2.4.BUILD-20200225.100750-121.jar:5.2.4.BUILD-SNAPSHOT]
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:331) ~[spring-context-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
at ubx.hub.streams.StreamsApplication$$EnhancerBySpringCGLIB$$3c0af961.enriched_toolsTable(<generated>) ~[classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154) ~[spring-beans-5.2.4.BUILD-20200225.100750-119.jar:5.2.4.BUILD-SNAPSHOT]
... 20 common frames omitted
РЕДАКТИРОВАТЬ: об ошибке соединения foreignKey, упомянутой Матиасом Дж. Саксом (KAFKA-9517), соединение было изменено на
SpecificAvroSerde<ficexp1.ficexp.tools.Key> kSerde = new SpecificAvroSerde<ficexp1.ficexp.tools.Key>();
SpecificAvroSerde<ficexp1.ficexp.tools.enriched> vSerde = new SpecificAvroSerde<ficexp1.ficexp.tools.enriched>();
Materialized materialized =
Materialized.as("toto").withValueSerde( (Serde) vSerde)
.withKeySerde( (Serde) kSerde );
KTable<ficexp1.ficexp.tools.Key, ficexp1.ficexp.tools.enriched> enrichedToolTable = toolsTable.join(
tools_typesTable,
foreignKeyExtractor,
(ValueJoiner) enrichedToolJoiner,
( Materialized ) materialized
);
и это приводит к той же ошибке