Как прочитать все записи в теме Кафки - PullRequest
0 голосов
/ 11 февраля 2019

Я использую kafka: kafka_2.12-2.1.0, подпружиненный kafka на стороне клиента и застрял с проблемой.

Мне нужно загрузить карту в памяти, прочитав все существующие сообщенияв теме кафки.Я сделал это, запустив нового потребителя (с уникальным идентификатором группы потребителей и установив смещение earliest).Затем я перебираю потребителя (метод poll), чтобы получить все сообщения и останавливаться, когда записи потребителя становятся пустыми.

Но я заметил, что, когда я начинаю опрос, первые несколько итераций возвращают записи потребителя как пустые, а затемон начинает возвращать фактические записи.Теперь это нарушает мою логику, так как наш код считает, что в теме нет записей.

Я пробовал несколько других способов (например, использование числа смещений), но не смог придумать никакого решения, кромегде-то храню еще одну запись, которая сообщает мне, сколько сообщений в теме нужно прочитать, прежде чем я остановлюсь.

Есть идеи, пожалуйста?

Ответы [ 2 ]

0 голосов
/ 14 февраля 2019

Вы должны использовать 2 потребителей, один для загрузки смещений, а другой для чтения всех записей.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class KafkaRecordReader {

    static final Map<String, Object> props = new HashMap<>();
    static {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "sample-client");
    }

    public static void main(String[] args) {
        final Map<TopicPartition, OffsetInfo> partitionOffsetInfos = getOffsets(Arrays.asList("world, sample"));
        final List<ConsumerRecord<byte[], byte[]>> records = readRecords(partitionOffsetInfos);

        System.out.println(partitionOffsetInfos);
        System.out.println("Read : " + records.size() + " records");
    }

    private static List<ConsumerRecord<byte[], byte[]>> readRecords(final Map<TopicPartition, OffsetInfo> offsetInfos) {
        final Properties readerProps = new Properties();
        readerProps.putAll(props);
        readerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "record-reader");

        final Map<TopicPartition, Boolean> partitionToReadStatusMap = new HashMap<>();
        offsetInfos.forEach((tp, offsetInfo) -> {
            partitionToReadStatusMap.put(tp, offsetInfo.beginOffset == offsetInfo.endOffset);
        });

        final List<ConsumerRecord<byte[], byte[]>> cachedRecords = new ArrayList<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(readerProps)) {
            consumer.assign(offsetInfos.keySet());
            for (final Map.Entry<TopicPartition, OffsetInfo> entry : offsetInfos.entrySet()) {
                consumer.seek(entry.getKey(), entry.getValue().beginOffset);
            }

            boolean close = false;
            while (!close) {
                final ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<byte[], byte[]> record : consumerRecords) {
                    cachedRecords.add(record);
                    final TopicPartition currentTp = new TopicPartition(record.topic(), record.partition());
                    if (record.offset() + 1 == offsetInfos.get(currentTp).endOffset) {
                        partitionToReadStatusMap.put(currentTp, true);
                    }
                }

                boolean done = true;
                for (final Map.Entry<TopicPartition, Boolean> entry : partitionToReadStatusMap.entrySet()) {
                    done &= entry.getValue();
                }
                close = done;
            }
        }
        return cachedRecords;
    }

    private static Map<TopicPartition, OffsetInfo> getOffsets(final List<String> topics) {
        final Properties offsetReaderProps = new Properties();
        offsetReaderProps.putAll(props);
        offsetReaderProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset-reader");

        final Map<TopicPartition, OffsetInfo> partitionOffsetInfo = new HashMap<>();
        try (final KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(offsetReaderProps)) {
            final List<PartitionInfo> partitionInfos = new ArrayList<>();
            topics.forEach(topic -> partitionInfos.addAll(consumer.partitionsFor("sample")));
            final Set<TopicPartition> topicPartitions = partitionInfos
                    .stream()
                    .map(x -> new TopicPartition(x.topic(), x.partition()))
                    .collect(Collectors.toSet());
            consumer.assign(topicPartitions);
            final Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
            final Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);

            for (final TopicPartition tp : topicPartitions) {
                partitionOffsetInfo.put(tp, new OffsetInfo(beginningOffsets.get(tp), endOffsets.get(tp)));
            }
        }
        return partitionOffsetInfo;
    }

    private static class OffsetInfo {

        private final long beginOffset;
        private final long endOffset;

        private OffsetInfo(long beginOffset, long endOffset) {
            this.beginOffset = beginOffset;
            this.endOffset = endOffset;
        }

        @Override
        public String toString() {
            return "OffsetInfo{" +
                    "beginOffset=" + beginOffset +
                    ", endOffset=" + endOffset +
                    '}';
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            OffsetInfo that = (OffsetInfo) o;
            return beginOffset == that.beginOffset &&
                    endOffset == that.endOffset;
        }

        @Override
        public int hashCode() {
            return Objects.hash(beginOffset, endOffset);
        }
    }
}
0 голосов
/ 11 февраля 2019

Насколько я понимаю, вы пытаетесь добиться, чтобы в вашем приложении была построена карта на основе значений, которые уже находятся в определенной теме.

Для этой задачи вместо ручного опроса темы, вы можете использовать Ktable в DSL Kafka Streams, который автоматически создаст читаемое хранилище значений ключей, которое будет отказоустойчивым, с поддержкой репликации и автоматически заполнится новыми значениями.

Вы можете сделать это простовызывая groupByKey в потоке, а затем используя агрегат.

KStreamBuilder builder = new KStreamBuilder();
KStream<String, Long> myKStream = builder.stream(Serdes.String(), Serdes.Long(), "topic_name");
KTable<String, Long> totalCount = myKStream.groupByKey().aggregate(this::initializer, this::aggregator);

(Фактический код может отличаться в зависимости от версии kafka, ваших настроек и т. д.)

Подробнее о концепциях Kafka Stream здесь

Затем я перебираю потребителя (метод опроса), чтобы получить все сообщения и остановиться, когда записи потребителя станут пустыми

Kafka - это платформа потоковой передачи сообщений.Любые передаваемые вами данные постоянно обновляются, и вам, вероятно, не следует использовать их так, как вы ожидаете, что потребление прекратится после определенного количества сообщений.Как вы будете обрабатывать, если новое сообщение приходит после того, как вы остановите потребителя?

Кроме того, причина, по которой вы получаете нулевые записи, возможно, связана с записями, находящимися в разных разделах и т. Д.

ЧтоВаш конкретный вариант использования здесь ?, Возможно, есть хороший способ сделать это с помощью самой семантики Kafka.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...