Spring cloud stream (kafka binder) добавлен метод паузы и возобновления работы потребителя
@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@StreamListener(Sink.INPUT)
public void in(String in, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
consumer.pause(Collections.singleton(new TopicPartition("myTopic", 0)));
}
@Bean
public ApplicationListener<ListenerContainerIdleEvent> idleListener() {
return event -> {
System.out.println(event);
if (event.getConsumer().paused().size() > 0) {
event.getConsumer().resume(event.getConsumer().paused());
}
};
}
}
пожалуйста, проверьте документы
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_usage_examples
но я думаю, что есть некоторая проблема с методом паузы:
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/479
PS /
Вы можете получить идентификатор раздела и название темы в примере прослушивателя:
@StreamListener(Sink.INPUT)
public void in(String in,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer) {
System.out.println(in);
TopicPartition p = new TopicPartition(topic, partition);
consumer.pause(Collections.singleton(p));
}
или в errorChannel global Listener
@StreamListener("errorChannel")
public void errorGlobal(Message<?> message) {
Message<?> failedMessage = ((ErrorMessage)message).getOriginalMessage();
Consumer consumer = (Consumer)failedMessage.getHeaders().get(KafkaHeaders.CONSUMER);
int partition = (int) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID);
String topic = (String) failedMessage.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC);
TopicPartition p = new TopicPartition(topic, partition);
// ?
consumer.pause(Collections.singleton(p));
}