Можно ли использовать сообщения кафки, используя ключ и раздел? - PullRequest
1 голос
/ 06 ноября 2019

Я использую kafka_2.12 версия 2.3.0 , где я публикую данные в теме kafka, используя раздел и ключ. Мне нужно найти способ, с помощью которого я могу использовать конкретное сообщение из темы, используя комбинацию клавиш и разделов. Таким образом, мне не нужно будет потреблять все сообщения и повторять для получения правильного.

Сейчас я могу сделать это только

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props)
consumer.subscribe(Collections.singletonList("topic"))
ConsumerRecords<String, String> records = consumer.poll(100)
def data = records.findAll {
    it -> it.key().equals(key)
}

Ответы [ 2 ]

1 голос
/ 06 ноября 2019

Существует два способа использования темы / разделов:

  1. KafkaConsumer.assign (): Ссылка на документ
  2. KafkaConsumer.subscribe (): Ссылка на документ

Итак, вы не можете получать сообщения по ключу.

Если у вас нет плана по расширению разделов, рассмотрите возможность использования assign ()метод. Потому что все сообщения, которые приходят с определенным ключом, будут идти в один и тот же раздел.

Как использовать:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition partition = new TopicPartition("some-topic", 0);
consumer.assign(Arrays.asList(partition));

while(true){
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    String data = records.findAll {
        it -> it.key().equals(key)
    }
}
1 голос
/ 06 ноября 2019

Вы не можете "получать сообщения по ключу от Kafka".

Одним из решений, если это целесообразно, было бы иметь столько разделов, сколько ключей, и всегда направлять сообщения для ключа в один и тот же раздел.

Ключ сообщения в качестве раздела

kafkaConsumer.assign(topicPartitions);
    kafkaConsumer.seekToBeginning(topicPartitions);

    // Pull records from kafka, keep polling until we get nothing back
    final List<ConsumerRecord<byte[], byte[]>> allRecords = new ArrayList<>();
    ConsumerRecords<byte[], byte[]> records;
    do {
        // Grab records from kafka
        records = kafkaConsumer.poll(2000L);
        logger.info("Found {} records in kafka", records.count());

        // Add to our array list
        records.forEach(allRecords::add);

    }
    while (!records.isEmpty());

Доступ к сообщениям темы с использованием только имени темы

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList(<Topic Name>,<Topic Name>));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...