Преобразование сообщений AvroSchema в приложении Spring Cloud Dataflow Sink - PullRequest
0 голосов
/ 21 мая 2018

Я пытаюсь использовать Преобразование сообщений на основе схемы Avro в приложении Spring Cloud Stream, которое является приемником.Из ошибок не могу понять, зачем нужен дополнительный канал, потребительский «redis-sink: 0.input».

Я развернул его с помощью scdf сервера в kubernetes, у меня есть другие приложения, которые не используют схему avroпреобразование, которое работает без проблем.

 @Configuration
@EnableConfigurationProperties({ RedisSinkProperties.class, EventAggregatorProperties.class })
@EnableBinding(Sink.class)
public class RedisSinkConfiguration {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private RedisSinkProperties redisSinkProperties;
    @Autowired
    private EventAggregatorProperties eventAggregatorProperties;

    // @StreamListener(Sink.INPUT)
    // public void receive(Object arg0) throws Exception {
    // System.out.println(eventAggregatorProperties);
    // System.out.println("this is working finally" + arg0);
    //
    // }
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
    private String endpoint;

    @Bean
    @Order(-100)
    public ConfluentSchemaRegistryClient confluentSchemaRegistryClient() {
        System.out.println("registry being initialized");
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }

    @Bean
    @ServiceActivator(inputChannel = Sink.INPUT)
    public MessageHandler redisSinkMessageHandler() {
        System.out.println(redisSinkProperties);
        System.out.println(redisConnectionFactory);
        System.out.println(eventAggregatorProperties);
        System.out.println("my thing is being picked now again");

        EventAggregatorMessageHandler eventHandler = new EventAggregatorMessageHandler(redisConnectionFactory,
                eventAggregatorProperties, confluentSchemaRegistryClient());
        eventHandler.setLoggingEnabled(true);

        return eventHandler;

        // }
        // if (redisSinkProperties.isKey()) {
        // RedisStoreWritingMessageHandler redisStoreWritingMessageHandler = new
        // RedisStoreWritingMessageHandler(
        // this.redisConnectionFactory);
        // redisStoreWritingMessageHandler.setKeyExpression(this.redisSinkProperties.keyExpression());
        // return redisStoreWritingMessageHandler;
        // }
        // else if (this.redisSinkProperties.isQueue()) {
        // return new
        // RedisQueueOutboundChannelAdapter(this.redisSinkProperties.queueExpression(),
        // this.redisConnectionFactory);
        // } else { // must be topic
        // RedisPublishingMessageHandler redisPublishingMessageHandler = new
        // RedisPublishingMessageHandler(
        // this.redisConnectionFactory);
        // redisPublishingMessageHandler.setTopicExpression(this.redisSinkProperties.topicExpression());
        // return redisPublishingMessageHandler;
        // }
        // }
    }
@Bean
    public MessageConverter customMessageConverter() throws IOException {
        ConfluentSchemaRegistryClientMessageConverter converter = new ConfluentSchemaRegistryClientMessageConverter(
                (ConfluentSchemaRegistryClient) confluentSchemaRegistryClient);
        converter.setDynamicSchemaGenerationEnabled(true);
        return converter;

    }

}

свойства переданы приложению:

    app.eventaggregator.spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
app.eventaggregator.spring.cloud.stream.kafka.bindings.input.consumer.startOffset=latest
app.eventaggregator.spring.cloud.stream.schemaRegistryClient.endpoint=http://stage3-avro-schema-registry-schema-registry.kafka:8081
app.eventaggregator.spring.redis.host=redis.prod-environment
app.eventaggregator.spring.redis.database=1
app.eventaggregator.redis.isEventStream=TRUE
app.eventaggregator.redis.topic=test
app.eventaggregator.logging.level.=ERROR
app.eventaggregator.stage3.eventsToFilter=APP_LAUNCH,PAGE_VIEW
app.eventaggregator.stage3.ttlMs=36000,36000
app.eventaggregator.stage3.initialRandomValueStart=0,222
app.eventaggregator.stage3.initialRandomValueEnd=0,300
app.eventaggregator.stage3.redisKeyNames=APP_LAUNCH,PAGE_VIEW
app.eventaggregator.spring.cloud.stream.default.contentType=application/vnd.stagingClickStream.v1+avro
app.log.spring.cloud.stream.kafka.bindings.input.consumer.resetOffsets=true
app.log.spring.cloud.stream.kafka.bindings.input.consumer.startOffset=earliest
app.log.spring.cloud.stream.kafka.bindings.schemaRegistryClient.endpoint=http://stage3-avro-schema-registry-schema-registry.kafka:8081
app.log.logging.level.=ERROR

И я получаю следующую ошибку в журнале:

              .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.5.2.RELEASE)
 registry being initialized
RedisSinkProperties(parser=org.springframework.expression.spel.standard.SpelExpressionParser@34c01041, topicExpression=null, queueExpression=null, keyExpression=null, key=null, queue=null, topic=test, isEventStream=false)
org.springframework.data.redis.connection.jedis.JedisConnectionFactory@536f2a7e
EventAggregatorProperties(eventsToFilter=[APP_LAUNCH, PAGE_VIEW], ttlMs=[36000, 36000], initialRandomValueStart=[0, 222], initialRandomValueEnd=[0, 300], redisKeyNames=[APP_LAUNCH, PAGE_VIEW])
my thing is being picked
2018-05-21 08:47:56,699 ERROR -kafka-listener-1 o.s.k.l.LoggingErrorHandler:37 - Error while processing: ConsumerRecord(topic = stagingClickStream, partition = 1, offset = 0, key = null, value = [B@38002450)
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'redis-sink:0.input'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=[Payload byte[193]][Headers={kafka_offset=0, id=2e3128ed-bca6-761a-49e8-44db263ca439, kafka_receivedPartitionId=1, kafka_receivedTopic=stagingClickStream, contentType=application/vnd.stagingClickStream.v1+avro, timestamp=1526892476669}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:292) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:286) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
    at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:70) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:64) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:171) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$000(KafkaMessageDrivenChannelAdapter.java:47) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:197) ~[spring-integration-kafka-2.0.1.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:76) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter$1.doWithRetry(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
        at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:286) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
        at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:179) ~[spring-retry-1.2.0.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.adapter.RetryingAcknowledgingMessageListenerAdapter.onMessage(RetryingAcknowledgingMessageListenerAdapter.java:71) ~[spring-kafka-1.0.5.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:597) [spring-kafka-1.0.5.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$1800(KafkaMessageListenerContainer.java:222) [spring-kafka-1.0.5.RELEASE.jar!/:na]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:772) [spring-kafka-1.0.5.RELEASE.jar!/:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_144]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_144]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_144]
    Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.8.RELEASE.jar!/:4.3.8.RELEASE]
        ... 29 common frames omitted

1 Ответ

0 голосов
/ 22 мая 2018

Там происходит что-то странное;не похоже, что фреймворк обнаруживает аннотацию @ServiceActivator.

Вскоре после

Creating shared instance of singleton bean 'redisSinkMessageHandler'

мы должны увидеть

Invoking afterPropertiesSet() on bean with name 'redisSinkConfiguration.redisSinkMessageHandler.serviceActivator'

Что является конечной точкой потребителядля обработчика (который подписывается на канал).

Я вижу постпроцессор бина, который создает этот бин, загружаемый в контекст, поэтому сейчас я понятия не имею, почему он не находит аннотацию.

Можете ли вы попробовать переместить этот компонент в его собственный класс @Configuration без других аннотаций?Тем временем я попытаюсь повторить.

РЕДАКТИРОВАТЬ

Я пытался (и не смог) воспроизвести его с ...

@SpringBootApplication
public class So50445264Application {

    public static void main(String[] args) {
        SpringApplication.run(So50445264Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel input) {
        return args -> {
            input.send(new GenericMessage<>("foo"));
        };
    }

}

и

@Configuration
@EnableBinding(Sink.class)
@EnableConfigurationProperties(MyProperties.class)
public class MyConfig {

    @Autowired
    private MyProperties myProps;

    @ServiceActivator(inputChannel = Sink.INPUT)
    @Bean
    public MessageHandler mh() {
        return new AbstractReplyProducingMessageHandler() {

            @Override
            protected Object handleRequestMessage(Message<?> requestMessage) {
                System.out.println(requestMessage.getPayload() + ":" + myProps.getFoo());
                return null;
            }
        };
    }

}

и

@ConfigurationProperties(prefix = "foo")
public class MyProperties {

    private String foo;

    public String getFoo() {
        return this.foo;
    }

    public void setFoo(String foo) {
        this.foo = foo;
    }

}

и

foo.foo=bar

но все работало нормально ...

foo:bar

Я тестировалс загрузкой 1.5.12 и <spring-cloud.version>Edgware.SR3</spring-cloud.version> (а также 2.0.2 и Finchley.BUILD-SNAPSHOT).

Я вижу, что вы используете довольно старую загрузку (и, вероятно, облако тоже).Можете ли вы попробовать с более новыми версиями?

...