Я следую простой документации по использованию потоков Kafka с пружинной загрузкой ( Spring guide )
Мне понятно, как входить и выходить сообщения, а затем посерединеЯ могу выполнить некоторую обработку, заменив @KafkaListener
и kafkaTemplate.send()
Так что я создал супер простой базовый класс, подобный этому:
@EnableBinding(Processor.class)
public static class UppercaseTransformer {
@StreamListener
@Input(Processor.INPUT)
public void receive(String input) {
System.out.println(input);
}
}
и затем (и, возможно, этомоя ошибка), из контроллера я делаю это:
template.send("my-topic","hello world");
Я использую потоки Spring Cloud с такой конфигурацией:
spring:
cloud:
stream:
bindings:
input:
destination: my-topic
group: ${spring.application.name}
consumer:
concurrency: ${KAFKA_CONSUMER_CONCURRENCY:3}
output:
destination: my-topic
kafka:
binder:
brokers: localhost:9092
auto-create-topics: false
required-acks: all
transaction:
transaction-id-prefix: ${spring.application.name}-
producer:
configuration:
retries: 3
bindings:
input:
consumer:
configuration:
isolation.level: read_committed
enable-dlq: true
dlq-name: some-name
Также пробовал это с потребителем и слушателем
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
и каждый раз, когда я пытаюсь отправить сообщение, я получаю это:
class java.lang.String cannot be cast to class [B (java.lang.String and [B are in module java.base of loader 'bootstrap')
Понятия не имею, что не так и почему так много переходят от обычного слушателя к этой версии ...идеи?