Как отметил Гари Рассел выше, ConsumerSeekCallback является устаревшим, поэтому он не разрешен ... и я не буду открывать вопрос GitHub ...
Я наконец смог достичь своей цели:
Когда пользователь входит в систему, он должен видеть все события, которые он пропустил со времени своего последнего сеанса.
путем обработки всех новых подписок в EventListener дляListenerContainerIdleEvent, где потребитель доступен как часть данных события:
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void idleEventHandler(ListenerContainerIdleEvent event) {
// find new subscriptions
Collection<EventListenerSubscription> newSubscriptions =
subscriptions.stream().filter(s -> s.isNew())
.collect(Collectors.toList());
if (!newSubscriptions.isEmpty()) {
// mark subscriptions a not new
newSubscriptions.forEach(s -> s.setNew(false));
// compute the oldest time stamp
OptionalLong oldestTimeStamp =
newSubscriptions.stream()
.mapToLong(s -> s.getLastTimeStamp())
.reduce(Long::min);
if (oldestTimeStamp.isPresent()) {
// seek on topic for oldest time stamp
Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();
timestampsToSearch.put(new TopicPartition(eventTopic, 0),
oldestTimeStamp.getAsLong());
Consumer<?, ?> consumer = event.getConsumer();
event.getConsumer().offsetsForTimes(timestampsToSearch).forEach((k, v) -> {
consumer.seek(k, v.offset());
});
}
}
}
Я определяю самую старую отметку времени во всех новых подписках, помечаю эти подписки как не новые и использую поиск потребителя нараздел для самой старой отметки времени.
Чтобы получить событие простоя контейнера, интервал простоя должен быть настроен в свойствах контейнера, как описано здесь .
Затем KafkaListener позаботится об отправке старых событий подписчикам (ранее новым):
@KafkaListener(id = "qux", topics = { "${app.event.topic}" }, errorHandler = "kafkaListenerErrorHandler")
public void receive(@Payload Event event, @Headers MessageHeaders headers) throws JsonProcessingException {
// collect the subscribers not marked as new
Collection<EventListenerSubscription> oldSubscriptions =
subscriptions.stream().filter(s -> !s.isNew())
.collect(Collectors.toList());
for (EventListenerSubscription s : oldSubscriptions) {
if (s.getLastTimeStamp() < timestamp) {
s.addMessage(event, timestamp);
}
}
}