Конфигурации Spring Cloud Stream для ввода и вывода KTable - PullRequest
0 голосов
/ 04 июня 2019

Я описал входы и выходы

public interface UserStream {

String USER_CREATE_EVENT_OUTPUT = "user-event-out";
String USER_IMPORT = "user-import";
String USER_EXPORT = "user-export";

@Output(USER_EVENT_OUTPUT)
MessageChannel userEventOutput();

@Output(USER_EXPORT)
KTable<?, ?> userInput();

@Input(USER_IMPORT)
KTable<?, ?> userOutput();

}

, а также процессор

@EnableBinding(UserStream.class)
....
@StreamListener
@SendTo(UserStream.USER_EXPORT)
public KTable<UserIdDto, UserDto> webQuoteUserConsumer(@Input(UserStream.USER_IMPORT) KTable<UserIdDto, WebQuoteUser> users) {
    return users
            .mapValues((id, user) -> createNewUser(user, id))
            .filter((id, user) -> user.isPresent())
            .mapValues((id, user) -> mapper.mapToUserDto(user.get()));
}

, но получил Caused by: java.lang.IllegalStateException: No factory found for binding target type: org.apache.kafka.streams.kstream.KTable among registered factories: channelFactory,messageSourceFactory

Похоже, я что-то упустил в конфигурации.Вот что я вставил в свой yml

spring:
  cloud:
    stream:
      bindings:
        user_import:
          group: steaming-service-group
          destination: user-export-topic
          contentType: application/json
          consumer:
            resetOffsets: true
            startOffset: earliest
            header-mode: headers
        quote_import:
          group: steaming-service-group
          destination: web-quotes
          contentType: application/json
          consumer:
            resetOffsets: true
            startOffset: earliest
            header-mode: headers
        quote_export:
          destination: quote_import-topic
          contentType: application/json
          producer:
            header-mode: headers

....

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          zkNodes: localhost:2181
        streams:
          binder:
            brokers: localhost:9092
          configuration:
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            commit.interval.ms: 1000

Пример кода был использован из https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-table-join

...