Слушатели Кафки, не работающие со Струнами? - PullRequest
0 голосов
/ 28 января 2019

Я следую простой документации по использованию потоков Kafka с пружинной загрузкой ( Spring guide )

Мне понятно, как входить и выходить сообщения, а затем посерединеЯ могу выполнить некоторую обработку, заменив @KafkaListener и kafkaTemplate.send()

Так что я создал супер простой базовый класс, подобный этому:

@EnableBinding(Processor.class)
public static class UppercaseTransformer {

  @StreamListener
  @Input(Processor.INPUT)
  public void receive(String input) {
    System.out.println(input);
  }
}

и затем (и, возможно, этомоя ошибка), из контроллера я делаю это:

 template.send("my-topic","hello world");

Я использую потоки Spring Cloud с такой конфигурацией:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: my-topic
          group: ${spring.application.name}
          consumer:
            concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
        output:
          destination: my-topic
      kafka:
        binder:
          brokers: localhost:9092
          auto-create-topics: false
          required-acks: all
          transaction:
            transaction-id-prefix: ${spring.application.name}-
            producer:
              configuration:
                retries: 3
        bindings:
          input:
            consumer:
              configuration:
                isolation.level: read_committed
              enable-dlq: true
              dlq-name: some-name

Также пробовал это с потребителем и слушателем

key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
              value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

и каждый раз, когда я пытаюсь отправить сообщение, я получаю это:

class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')

Понятия не имею, что не так и почему так много переходят от обычного слушателя к этой версии ...идеи?

1 Ответ

0 голосов
/ 29 января 2019

Я только что создал приложение из start.spring.io и выбрал «Облачный поток» и «Кафка».Сгенерировал проект и добавил его в основной класс (Использовал ту же конфигурацию, которую вы указали выше).

@SpringBootApplication
@EnableBinding(Processor.class)
public class So54408906Application {

    public static void main(String[] args) {
        SpringApplication.run(So54408906Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    public void receive(String input) {
        System.out.println(input);
    }

}

Затем запустил скрипт производителя консоли kafka.

kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

Текст предоставлен вскрипт регистрировался на консоли приложения.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...