Канал Spring Cloud Stream Kafka не работает в загрузочном приложении Spring - PullRequest
0 голосов
/ 30 апреля 2018

Я пытался заставить входящий SubscribeableChannel и исходящий MessageChannel работать в моем приложении для весенней загрузки.

Я успешно настроил канал kafka и успешно его протестировал.

Кроме того, я создал базовое приложение для весенней загрузки, которое проверяет добавление и получение данных из канала.

Проблема, с которой я сталкиваюсь, заключается в том, что когда я помещаю эквивалентный код в приложение, к которому он принадлежит, кажется, что сообщения никогда не отправляются и не принимаются. Отладкой трудно определить, что происходит, но единственное, что мне кажется другим, это название канала. В рабочем значении имя канала похоже на application.channel в нерабочем приложении, его localhost: 8080 / channel.

Мне было интересно, есть ли какая-то конфигурация весенней загрузки, блокирующая или изменяющая создание каналов в другой источник канала?

У кого-нибудь были подобные проблемы?

application.yml

spring:
  datasource:
    url: jdbc:h2:mem:dpemail;DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE
    platform: h2
    username: hello
    password: 
    driverClassName: org.h2.Driver    
  jpa:
    properties:
      hibernate:
        show_sql: true
        use_sql_comments: true
        format_sql: true

  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
          contentType: application/json
        email-out:
          destination: email
          contentType: application/json

E-mail

public class Email {

    private long timestamp;

    private String message;

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

}

Binding Config

@EnableBinding(EmailQueues.class)
public class EmailQueueConfiguration {    
}

Интерфейс

public interface EmailQueues {

    String INPUT = "email-in";
    String OUTPUT = "email-out";

    @Input(INPUT)
    SubscribableChannel inboundEmails();

    @Output(OUTPUT)
    MessageChannel outboundEmails();
}

Контроллер

 @RestController
    @RequestMapping("/queue")
    public class EmailQueueController {

        private EmailQueues emailQueues;


        @Autowired
        public EmailQueueController(EmailQueues emailQueues) {
            this.emailQueues = emailQueues;
        }

        @RequestMapping(value = "sendEmail", method = POST)
        @ResponseStatus(ACCEPTED)
        public void sendToQueue() {
            MessageChannel messageChannel = emailQueues.outboundEmails();
            Email email = new Email();
            email.setMessage("hello world: " + System.currentTimeMillis());
            email.setTimestamp(System.currentTimeMillis());

            messageChannel.send(MessageBuilder.withPayload(email).setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON).build());

        }

        @StreamListener(EmailQueues.INPUT)
        public void handleEmail(@Payload Email email) {
            System.out.println("received: " + email.getMessage());
        }
    }

Я не уверен, что один из унаследованных проектов конфигурации, использующих Spring-Cloud, Spring-Cloud-Sleuth может препятствовать его работе, но даже когда я удаляю его, он все еще не работает. Но в отличие от моего приложения, которое работает с приведенным выше кодом, я никогда не вижу настроенного ConsumeConfig, например:

o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = consumer-2
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true

(Эта конфигурация - то, что я вижу в моем основном приложении Spring Boot при запуске приведенного выше кода, и код работает для записи и чтения из канала kafka) ....

Я предполагаю, что в одной из библиотек, которые я использую для создания канала другого типа, имеется некоторая конфигурация загрузки, которую я просто не могу найти, что это за конфигурация.

Ответы [ 2 ]

0 голосов
/ 01 мая 2018

Хорошо, после долгих отладок ... Я обнаружил, что что-то создает Binder Support Test Binder (как пока не знаю), поэтому очевидно, что это используется, чтобы не влиять на добавление сообщений в реальный канал.

После добавления

@SpringBootApplication(exclude = TestSupportBinderAutoConfiguration.class)

Конфигурации канала kafka сработали, и сообщения добавляются ... было бы интересно узнать, что на самом деле настраивает это связующее средство поддержки тестирования ... В конце концов я найду эту присоску.

0 голосов
/ 30 апреля 2018

То, что вы опубликовали, содержит много несвязанных настроек, поэтому сложно определить, что мешает. Кроме того, когда вы говорите ".. кажется, что сообщения никогда не отправляются или не принимаются .." есть ли исключения в журналах? Также, пожалуйста, укажите версию Kafka, которую вы используете, а также Spring Cloud Stream. Теперь я попытался воспроизвести его на основе вашего кода (после некоторой очистки, чтобы оставить только соответствующие части) и смог успешно отправить / получить.

Моя версия Kafka - 0.11 и Spring Cloud Stream 2.0.0. Вот соответствующий код:

spring:   
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        email-in:
          destination: email
        email-out:
          destination: email

@SpringBootApplication
@EnableBinding(KafkaQuestionSoApplication.EmailQueues.class)
public class KafkaQuestionSoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaQuestionSoApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(EmailQueues emailQueues) {
        return new ApplicationRunner() {
            @Override
            public void run(ApplicationArguments args) throws Exception {
                emailQueues.outboundEmails().send(new GenericMessage<String>("Hello"));
            }
        };
    }

    @StreamListener(EmailQueues.INPUT)
    public void handleEmail(String payload) {
        System.out.println("received: " + payload);
    }


    public interface EmailQueues {
        String INPUT = "email-in";
        String OUTPUT = "email-out";

        @Input(INPUT)
        SubscribableChannel inboundEmails();

        @Output(OUTPUT)
        MessageChannel outboundEmails();
    }
}
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...