У меня нормально работает, это было с загрузкой 2.0.5, Spring Kafka 2.1.10 ...
@SpringBootApplication
public class So52284259Application implements ConsumerAwareRebalanceListener {
private static final Logger logger = LoggerFactory.getLogger(So52284259Application.class);
public static void main(String[] args) {
SpringApplication.run(So52284259Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template, GenericConsumer consumer) {
return args -> {
// for (int i = 0; i < 1000; i++) { // load up the topic on first run
// template.send("so52284259", "foo" + i);
// }
consumer.poll("so52284259", "generic");
};
}
@KafkaListener(id = "listener", topics = "so52284259")
public void listen(String in) {
if ("foo999".equals(in)) {
logger.info("@KafkaListener: " + in);
}
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
consumer.seekToBeginning(partitions);
}
@Bean
public NewTopic topic() {
return new NewTopic("so52284259", 1, (short) 1);
}
}
@Component
class GenericConsumer {
private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);
@Autowired
ConsumerFactory<String, String> consumerFactory;
public void poll(String topic, String group) {
logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
Consumer<String, String> consumer = consumerFactory.createConsumer(group, "");
consumer.subscribe(Arrays.asList(topic));
// need to make a dummy poll before we can seek
consumer.poll(1000);
consumer.seekToBeginning(consumer.assignment());
ConsumerRecords<String, String> records;
boolean done = false;
while (!done) {
records = consumer.poll(10000);
logger.info("------------ Total " + records.count() + " records polled");
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
while (iterator.hasNext()) {
String value = iterator.next().value();
if ("foo999".equals(value)) {
logger.info("Consumer: " + value);
done = true;
}
}
}
consumer.close();
}
}
и
2018-09-12 09:35:25.929 INFO 61390 --- [ main] com.example.GenericConsumer : ------------ Total 500 records polled
2018-09-12 09:35:25.931 INFO 61390 --- [ main] com.example.GenericConsumer : ------------ Total 500 records polled
2018-09-12 09:35:25.932 INFO 61390 --- [ main] com.example.GenericConsumer : Consumer: foo999
2018-09-12 09:35:25.942 INFO 61390 --- [ listener-0-C-1] com.example.So52284259Application : @KafkaListener: foo999