Как искать смещение в Spring Kafka клиенту? - PullRequest
0 голосов
/ 12 февраля 2020

Мы используем ConsumerSeekAware и seekToBeginning при разработке микросервиса, используя данные из канала. Когда мы будем готовы, мы бы хотели, чтобы потребитель использовал смещение. Из-за документации я не смог понять, что означает seekToEnd. Означает ли это, что он будет искать смещение (последнее сообщение ACKed) или будет искать последнее сообщение topi c (независимо от смещения) и ждет новых?

@Slf4j
public class KafkaSeekOffsetAwareListener implements ConsumerSeekAware {
    protected final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        log.info("Registering seek callback");
        this.seekCallBack.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        log.info("Incoming Topic Partitions {}", assignments);
---->   assignments.forEach((assignment, assignmentKey) -> assignments.forEach((t, o) -> callback.seekToBeginning(t.topic(), t.partition())));
    }

    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        log.info("Container idle");
    }
}

Ответы [ 3 ]

1 голос
/ 12 февраля 2020

См. JavaDocs:

/**
 * Seek to the last offset for each of the given partitions. This function evaluates lazily, seeking to the
 * final offset in all partitions only when {@link #poll(Duration)} or {@link #position(TopicPartition)} are called.
 * If no partitions are provided, seek to the final offset for all of the currently assigned partitions.
 * <p>
 * If {@code isolation.level=read_committed}, the end offset will be the Last Stable Offset, i.e., the offset
 * of the first message with an open transaction.
 *
 * @throws IllegalArgumentException if {@code partitions} is {@code null}
 * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
 */
@Override
public void seekToEnd(Collection<TopicPartition> partitions) {

Как видите, это чистый Apache клиент Kafka. Spring для Apache Kafka делает не что иное, как делегирование этой функциональности. Я бы даже сказал, что неправильно говорить, что Spring для Apache Kafka - это «клиент для Kafka». Это просто оболочка и высокоуровневый Spring API для стандартного Apache клиента Kafka.

0 голосов
/ 12 февраля 2020

seekToEnd означает именно это; установить позицию в КОНЕЦ раздела, независимо от «текущей» позиции данного потребителя; следующая полученная запись будет следующей, добавленной в раздел после выполнения поиска.

Любые записи, опубликованные при остановке потребителя, будут пропущены.

0 голосов
/ 12 февраля 2020

Из-за документации я не смог понять, что означает seekToEnd. Означает ли это, что он будет искать смещение (последнее сообщение ACKed) или будет искать последнее сообщение из topi c (независимо от смещения) и ждет новых?

Это одно и то же. Последнее смещение в топи c является последним подтвержденным (при условии, что производители даже включили подтверждение), а поиск до конца перемещает «курсор» потребления в конец независимо от смещений

...