Я пытаюсь реализовать PollableConsumer, который начинает опрашивать сообщения от Kafka при определенных условиях, в этом случае, когда я достигаю конечной точки в моем приложении SpringBoot.
Я пробовал несколько способов запуска опроса при определенных условиях, но, видимо, он работает, только если он постоянно опрашивает тему кафки. (как и во всех примерах в документах Spring-Cloud-Stream)
Я ищу что-то вроде этого:
public interface CustomProcessor {
@Input
PollableMessageSource input();
}
public void run() {
boolean result = true;
while (result) {
result = input.poll(m -> {
Event event = (Event) m.getPayload();
GenericMessage<Event> genericMessage = new GenericMessage<>(event, m.getHeaders());
eventMessageConsumer.consume(genericMessage);
}, new ParameterizedTypeReference<Event>() {
});
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
if (result) {
System.out.println("Success");
}
}
}
Это может сработать, когда я достигну такой конечной точки:
@GetMapping("/process")
public void process() {
SomeClass.run();
}