Мы используем ConsumerSeekAware
и seekToBeginning
при разработке микросервиса, используя данные из канала. Когда мы будем готовы, мы бы хотели, чтобы потребитель использовал смещение. Из-за документации я не смог понять, что означает seekToEnd
. Означает ли это, что он будет искать смещение (последнее сообщение ACKed) или будет искать последнее сообщение topi c (независимо от смещения) и ждет новых?
@Slf4j
public class KafkaSeekOffsetAwareListener implements ConsumerSeekAware {
protected final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
log.info("Registering seek callback");
this.seekCallBack.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Incoming Topic Partitions {}", assignments);
----> assignments.forEach((assignment, assignmentKey) -> assignments.forEach((t, o) -> callback.seekToBeginning(t.topic(), t.partition())));
}
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
log.info("Container idle");
}
}