Не гарантирован ли порядок сообщений при использовании KafkaEmbedded? - PullRequest
0 голосов
/ 04 июля 2019

Я провел модульный тест, используя KafkaEmbeddedKafkaTemplate), но порядок сообщений является случайным. Кто-нибудь знает, логично ли это, и возможен ли порядок гарантии?

вот мой код:

public class KafkaTest {

  private static String TOPIC = "test.topic";

  @ClassRule
  public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC);

  @Test
  public void testEmbeddedKafkaSendOrder() throws Exception {
    Map<String, Object> producerConfig = new HashMap<>();
    producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

    KafkaTemplate<String, byte[]> kafkaTemplate = new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerConfig));
    kafkaTemplate.send(TOPIC, "TEST1".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST2".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST3".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST4".getBytes()).get();
    kafkaTemplate.send(TOPIC, "TEST5".getBytes()).get();

    Map<String, Object> consumerConfig = new HashMap<>();
    consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
    consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-test-group");
    consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
    consumerConfig.put("auto.offset.reset", "earliest");

    final Consumer<String, byte[]> consumer = new KafkaConsumer<>(consumerConfig);
    embeddedKafka.consumeFromAnEmbeddedTopic(consumer, TOPIC);
    ConsumerRecords<String, byte[]> records = consumer.poll(100L);

    // Tests
    final Iterator<ConsumerRecord<String, byte[]>> recordIterator = records.iterator();
    while (recordIterator.hasNext()) {
      System.out.println("received:" + new String(recordIterator.next().value()));
    }
  }

Например, этот код печатается (но порядок может измениться):

received:TEST2
received:TEST4
received:TEST1
received:TEST3
received:TEST5

1 Ответ

2 голосов
/ 04 июля 2019

В Kafka вы можете быть уверены, что порядок сообщений в том же разделе одинаков, но не в теме.

Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition

Цитата из книги Kafka: The Definitive Guide: Real-Time Data and Stream Processing at Scale.Что вы можете сделать по этому поводу и как получать сообщения по порядку?Вариант 1:

        kafkaTemplate.send(TOPIC,"1", "TEST1".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST2".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST3".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST4".getBytes()).get();
        kafkaTemplate.send(TOPIC,"1", "TEST5".getBytes()).get();

Таким образом, для каждого значения вы посылаете один и тот же ключ «1».Кафка выберет раздел на основе вашего ключа.Поскольку все ключи равны, все сообщения будут отправляться в один и тот же раздел, и вы получите свои записи в следующем порядке.

Вариант 2: Инициализируйте KafkaEmbedded таким образом:

new KafkaEmbedded(1, true,1, TOPIC);

Таким образом, выГоворя Кафке, что для этой темы вы хотели бы иметь только один раздел, чтобы каждая запись шла в этот раздел.

...