проблемы spring-cloud-stream-binder-kafka-streams при получении потоковых событий - PullRequest
0 голосов
/ 25 февраля 2020

нужна помощь, пытался в течение многих дней заставить эту базовую вещь работать. У меня есть одна topi c, которая должна запускать потоковые события, которые он просматривает, используя k sql. Но приложение весенней загрузки просто получает сообщение в topi c (да, хочу, чтобы это было у одного потребителя), и потоковое событие не происходит. Может быть связано с сериализацией, иногда я мог получать сообщения через. Ключ строки и значение json. Spring Cloud Stream Binder Kafka Streams.

Этот должен получать события, но нет. Чистый топи c получает сообщения. Не поток

@Bean
public Consumer<KStream<?, String>> process() {
return input -> input.foreach((eventKey, event) -> {
System.out.println("STREAM EVENT: " + event);
});
}

Также есть:

public interface EventsBinding {

  String INPUT = "monitoring_events";

  // Input topic
  @Input(INPUT)
  SubscribableChannel monitoringEvents();

}

и

@Service
@Slf4j
public class EventsConsumer {

  private EventsService eventsService;

  public EventsConsumer(EventsService eventsService) {
    this.eventsService = eventsService;
  }

  @StreamListener(target = EventsBinding.INPUT)
  //@SendTo(EventsBinding.OUTPUT)
  public void handleEvents(Event event) {
    log.info("Received event: {}", event);
  }
}
spring:
  application:
    name: events-monitor
  kafka:
    bootstrap-servers: localhost:9092
  cloud:
    stream:
      # Activate functions aka event handlers
      function:
        definition: process
      bindings:
        input:
          contentType: application/json
        process-in-0:
          destination: monitoring_events
      kafka:
        bindings:
          process-in-0:
            destination: monitoring_events
            contentType: application/json
            useNativeEncoding: true
            group: events-monitoring
        streams:
          binder:
            configuration:
              commit.interval.ms: 1000
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            applicationId: events-monitor
            functions.process.applicationId: events-monitor
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...