Не удалось очистить хранилище состояний (Spring Cloud Stream с помощью Kafka Stream Binder) - PullRequest
0 голосов
/ 06 июня 2018

Я пытаюсь создать простое потоковое приложение, которое читает сообщения журнала в формате json и считает различные типы журналов.Это приложение не работает из-за NullPointerException, что кажется невозможным MessageConverter.

Должен ли я использовать другой тип контента или serde?Любая помощь будет оценена.

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.0.2.RELEASE)
2018-06-06 14:03:26.876  INFO 1 --- [           main] c.c.m.e.e.l.LogEventConsumerApplication  : Starting LogEventConsumerApplication v0.0.1-SNAPSHOT on logevent-stream-consumer-14-xx17g with PID 1 (/deployments/logevent-stream-consumer-0.0.1-SNAPSHOT.jar started by ? in /deployments)
2018-06-06 14:03:26.889  INFO 1 --- [           main] c.c.m.e.e.l.LogEventConsumerApplication  : No active profile set, falling back to default profiles: default
2018-06-06 14:03:27.047  INFO 1 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@358ee631: startup date [Wed Jun 06 14:03:27 CST 2018]; root of context hierarchy
2018-06-06 14:03:28.457  INFO 1 --- [           main] o.s.i.config.IntegrationRegistrar        : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2018-06-06 14:03:29.186  INFO 1 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2018-06-06 14:03:29.192  INFO 1 --- [           main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2018-06-06 14:03:29.422  INFO 1 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.kafka.annotation.KafkaBootstrapConfiguration' of type [org.springframework.kafka.annotation.KafkaBootstrapConfiguration$$EnhancerBySpringCGLIB$$3a04a801] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-06-06 14:03:29.594  INFO 1 --- [           main] trationDelegate$BeanPostProcessorChecker : Bean 'org.springframework.integration.config.IntegrationManagementConfiguration' of type [org.springframework.integration.config.IntegrationManagementConfiguration$$EnhancerBySpringCGLIB$$d14451ad] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2018-06-06 14:03:30.599  INFO 1 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService  'taskScheduler'
Exception in thread "default-group-92b2551e-6d84-47a1-9255-2ccc7be847b6-StreamThread-1" org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Failed to flush state store default-group-counts-store
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245)
    at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196)
    at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:327)
    at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:307)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:302)
    at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:292)
    at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:452)
    at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:381)
    at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:310)
    at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsMessageConversionDelegate.lambda$serializeOnOutbound$0(KafkaStreamsMessageConversionDelegate.java:86)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
    at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
    at org.apache.kafka.streams.kstream.internals.ForwardingCacheFlushListener.apply(ForwardingCacheFlushListener.java:42)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.maybeForward(CachingWindowStore.java:115)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.access$300(CachingWindowStore.java:36)
    at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:99)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
    at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99)
    at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127)
    at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:132)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:128)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:242)
    ... 14 more

application.properties

spring.cloud.stream.bindings.es4xx5xxError.destination=es4xx5xx_error
spring.cloud.stream.bindings.es4xx5xxError.group=default-group
spring.cloud.stream.bindings.errorEvent.destination=test-event-x
spring.cloud.stream.bindings.errorEvent.group=default-group
spring.cloud.stream.kafka.streams.binder.brokers=localhost
spring.cloud.stream.kafka.streams.binder.zkNodes=localhost
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.timeWindow.length=${TIME_WINDOW_MS:60000}
spring.cloud.stream.kafka.streams.timeWindow.advanceBy=${TIME_ADVANCE_MS:60000}

Процессор

    @Autowired
    private TimeWindows window;

    @Value("${spring.cloud.stream.bindings.errorEvent.group}")
    private String group;

    @StreamListener("es4xx5xxError")
    @SendTo("errorEvent")
    public KStream<?, CodeCount> process(KStream<?, Source> data) {
        return data.mapValues(x -> x.getResponseCode()+"")
                .groupBy((key, code) -> code)               
                .windowedBy(window)
                .count(Materialized.as(group + "-counts-store"))
                .toStream()
                .map((w, c) -> new KeyValue<>(null, getCodeCount(w, c)));       
    }
...