Spring Cloud StreamListener @ Output KStream Serdes не работает - PullRequest
0 голосов
/ 25 февраля 2019

У меня есть Stream Listener как

@StreamListener(target = "requesti")
@SendTo("responseo")
public KStream<UUID,Account> process(KStream<UUID, Account> events) {
    // Predicate<UUID, Event> isAccount = (key, value) ->
    // value.getEntity().getClass().equals(Account.class);

    // @formatter:off
    return events
            //.filter(isAccount)
            .peek((key, value) -> {
                log.debug("Processing {} {}", key, value);
            });
            /*
            .filter(isAccount)
            .map((key, value) -> process(value))

            .peek((key, value) -> {
                log.debug("Processed {} {}", key, value);
            });
            */
    // @formatter:on

}

Где конфигурация @Input ("requestti") следующая:

spring.cloud.stream.kafka.streams.bindings.requesti.consumer.application-id=repo-event-consumer
spring.cloud.stream.bindings.requesti.destination=request
spring.cloud.stream.bindings.requesti.content-type=application/json
spring.cloud.stream.bindings.requesti.consumer.header-mode=raw

и @output ("responseo")Конфигурация выглядит следующим образом:

spring.cloud.stream.kafka.streams.bindings.responseo.consumer.application-id=repo-response-producer
spring.cloud.stream.bindings.responseo.destination=response
spring.cloud.stream.bindings.responseo.content-type=application/json
spring.cloud.stream.bindings.responseo.producer.header-mode=raw
spring.cloud.stream.bindings.responseo.producer.use-native-encoding=true
spring.cloud.stream.kafka.streams.bindings.responseo.producer.key-serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.bindings.responseo.producer.value-serde=org.springframework.kafka.support.serializer.JsonSerde

Мой Процессор получает запрос и может также отправлять вывод, но вывод следующий:

[Producer clientId = repo-event-consumer -49827b40-2357-4af0-8103-228343faa59e-StreamThread-1-Manufacturer] Отправка записи ProducerRecord (тема = ответ, раздел = ноль, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = TypeId ), значение = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101,46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65,99, 99, 111, 117, 110, 116])], isReadOnly = true), ключ = [B @ 6a5e4294, значение = [B @ 5a0852e1, отметка времени = 1551093349173) с обратным вызовом org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1@336dbba5 к ответной теме темына 2

Несколько вещей, которые я путаю с идентификатором записи производителя, не является "repo-response-produser", и, во-вторых, key-serde / value-serde не используется, на мой взгляд, это должно было быть

Отправка записи ProducerRecord (тема = запрос, раздел = ноль, заголовки = RecordHeaders (заголовки = [RecordHeader (ключ = Key_TypeId * , значение = [106, 97, 118, 97, 46, 117, 116,105, 108, 46, 85, 85, 73, 68]), RecordHeader (ключ = TypeId , значение = [117, 107, 46, 111, 114, 103, 46, 99, 97, 116, 97, 112, 117, 108, 116, 46, 101, 115, 46, 99, 117, 98, 101, 46, 115, 101, 114, 118, 105, 99, 101, 115, 46, 97, 99, 99, 111, 117, 110, 116, 46, 109, 111, 100, 101, 108, 46, 65, 99, 99, 111, 117, 110, 116])], isReadOnly = true), ключ = 6f0f50e2-3add-4d22-a370-cac66d016af0, значение = Account () с обратным вызовом org.springframework.kafka.core.KafkaTemplate$$Lambda$582/533392019@85ab964 для раздела запроса темы 2

и по умолчанию serdeConfig

    spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

Репо

1 Ответ

0 голосов
/ 26 февраля 2019

Вот пример, который демонстрирует работу JsonSerde на исходящем входе с использованием связывателя Kafka Streams: https://github.com/schacko-samples/json-serde-example. Запустите пример и убедитесь, что он работает.Посмотрите на application.yml для деталей конфигурации.Я поставил некоторые детали в предоставленном README.

...