нужна помощь, пытался в течение многих дней заставить эту базовую вещь работать. У меня есть одна topi c, которая должна запускать потоковые события, которые он просматривает, используя k sql. Но приложение весенней загрузки просто получает сообщение в topi c (да, хочу, чтобы это было у одного потребителя), и потоковое событие не происходит. Может быть связано с сериализацией, иногда я мог получать сообщения через. Ключ строки и значение json. Spring Cloud Stream Binder Kafka Streams.
Этот должен получать события, но нет. Чистый топи c получает сообщения. Не поток
@Bean
public Consumer<KStream<?, String>> process() {
return input -> input.foreach((eventKey, event) -> {
System.out.println("STREAM EVENT: " + event);
});
}
Также есть:
public interface EventsBinding {
String INPUT = "monitoring_events";
// Input topic
@Input(INPUT)
SubscribableChannel monitoringEvents();
}
и
@Service
@Slf4j
public class EventsConsumer {
private EventsService eventsService;
public EventsConsumer(EventsService eventsService) {
this.eventsService = eventsService;
}
@StreamListener(target = EventsBinding.INPUT)
//@SendTo(EventsBinding.OUTPUT)
public void handleEvents(Event event) {
log.info("Received event: {}", event);
}
}
spring:
application:
name: events-monitor
kafka:
bootstrap-servers: localhost:9092
cloud:
stream:
# Activate functions aka event handlers
function:
definition: process
bindings:
input:
contentType: application/json
process-in-0:
destination: monitoring_events
kafka:
bindings:
process-in-0:
destination: monitoring_events
contentType: application/json
useNativeEncoding: true
group: events-monitoring
streams:
binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
applicationId: events-monitor
functions.process.applicationId: events-monitor