Я довольно новый в Кафке.
Для проведения стресс-тестирования моего кластера и создания опыта эксплуатации я создал два простых приложения Java: одно, которое многократно публикует сообщения в теме (последовательность целых чисел), и другое приложение, которое загружает всю тему (все записи) и проверяет, что последовательность завершена.Ожидается, что сообщения не будут потеряны из-за операций в кластере (перезапуск узла, замена узла, перенастройка разделов разделов и т. Д.).
Раздел «последовательность» состоит из двух разделов и коэффициента репликации 3.кластер состоит из 3-х виртуальных узлов (это для целей тестирования, следовательно, они работают на одной машине).Тема настроена на сохранение всех сообщений (retention.ms
установлено на -1
)
В настоящее время у меня есть две проблемы, которые мне трудно выяснить:
ЕслиЯ использую bin/kafka-console-consumer.sh --bootstrap-server kafka-test-server:9090,kafka-test-server:9091,kafka-test-server:9092 --topic sequence --from-beginning
Я вижу ВСЕ сообщения (даже не упорядоченные, как ожидалось), загруженные на консоль.С другой стороны, если я использую написанное мной приложение-потребитель, я вижу разные результаты, загружаемые в каждом цикле: https://i.stack.imgur.com/tMK10.png - В выводе консоли первая строка после делителя - это вызов records.partitions()
следовательно, записи только иногда извлекаются из обоих разделов.Почему и почему java-приложение не ведет себя как bin/kafka-console-consumer.sh
?
Когда тема становится большой, bin/kafka-console-consumer.sh
по-прежнему может показывать все сообщения, в то время как приложение можетзагрузить только около 18'000 сообщений.Я попытался поиграть с различными конфигурациями на стороне потребителя, но без прогресса.Опять же, вопрос в том, почему есть разница?
Заранее благодарен за любую подсказку!
Вот для справки.Обсуждались два приложения:
package ch.demo.toys;
import java.io.FileInputStream;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class SequenceProducer {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.load(new FileInputStream("toy.properties"));
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1);
for (Integer sequence_i = 0; true; sequence_i++) {
try(Producer<Integer, String> producer = new KafkaProducer<>(properties)) {
ProducerRecord<Integer, String> record = new ProducerRecord<>("sequence", sequence_i, "Sequence number: " + String.valueOf(sequence_i));
Future<RecordMetadata> sendFuture = producer.send(record, (metadata, exception) -> {
System.out.println("Adding " + record.key() + " to partition " + metadata.partition());
if (exception != null) {
exception.printStackTrace();
}
});
}
Thread.sleep(200);
}
}
}
package ch.demo.toys;
import java.io.FileInputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
public class CarthusianConsumer {
private static Properties getProperties() throws Exception {
Properties properties = new Properties();
properties.load(new FileInputStream("toy.properties"));
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.IntegerDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class);
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, Integer.MAX_VALUE);
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60 * 1000);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "carthusian-consumer");
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60 * 1000);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 1024 * 1024 * 1024);
return properties;
}
private static boolean checkConsistency(List<Integer> sequence) {
Collections.sort(sequence);
Iterator<Integer> iterator = sequence.iterator();
int control = 0;
while(iterator.hasNext()) {
int value = iterator.next();
if (value != control) {
System.out.println("");
System.out.println("Gap found:");
System.out.println("\tSequence: " + value);
System.out.println("\tControl: " + control);
return false;
}
control++;
}
System.out.print(".");
return true;
}
public static void main(String[] args) throws Exception {
// Step 1: create a base consumer object
Consumer<Integer, String> consumer = new KafkaConsumer<>(getProperties());
// Step 2: load topic configuration and build list of TopicPartitons
List<TopicPartition> topicPartitions = consumer
.partitionsFor("sequence")
.stream()
.parallel()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());
while (true) {
List<Integer> sequence = new ArrayList<>();
for (TopicPartition topicPartition : topicPartitions) {
// Step 3. specify the topic-partition to "read" from
// System.out.println("Partition specified: " + topicPartition);
consumer.assign(Arrays.asList(topicPartition));
// Step 4. set offset at the beginning
consumer.seekToBeginning(Arrays.asList(topicPartition));
// Step 5. get all records from topic-partition
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// System.out.println("\tCount: " + records.count());
// System.out.println("\tPartitions: " + records.partitions());
records.forEach(record -> { sequence.add(record.key()); });
}
System.out.println(sequence.size());
checkConsistency(sequence);
Thread.sleep(2500);
}
}
}