Kafka Node - Как получить все сообщения на сжатую тему - PullRequest
0 голосов
/ 25 августа 2018

Я пытаюсь использовать kafka-node для чтения сжатых сообщений из темы kafka.

Проблема заключается в том, что недавно вставленные сообщения остаются выше EOL и недоступны до тех пор, пока не будут добавлены дополнительные сообщения.Фактически, между EOL и смещением высокой воды существует разрыв, который предотвращает чтение последних сообщений.Непонятно, почему это так.

Тема была создана с

kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1

В тему вводятся несколько ключевых значений.Некоторые ключи были одинаковыми.

var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
  producer.send(payload, function(error, result) {
  debug('Sent payload to Kafka: ', payload);
  if (error) {
    console.error(error);
  } else {
   res(true)
  }
  client.close()
 });
});

Здесь вставлены ключи и значения

key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3

Затем был запрошен набор ключей темы.

var options = {
        id: 'consumer1',
        kafkaHost: "<host:port>",
        groupId: "consumergroup1",
        sessionTimeout: 15000,
        protocol: ['roundrobin'],
        fromOffset: 'earliest'
      };
      var consumerGroup = new ConsumerGroup(options, topic);
        consumerGroup.on('error', onError);
        consumerGroup.on('message', onMessage);
        consumerGroup.on('done', function(message) {
          consumerGroup.close(true,function(){ });
        })
        function onError (error) {
          console.error(error);
        }
        function onMessage (message) {)
            console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
        }
      })
Результаты удивительны:
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=

Существует сильное смещение воды, которое представляет самое последнее значение 10. Однако значение смещения, которое видит потребитель, составляет только 7. Каким-то образомсжатие не позволяет потребителю видеть последние сообщения.

Непонятно, как избежать этого ограничения и разрешить потребителю видеть последние сообщения.

Любые предложения приветствуются.Спасибо.

Ответы [ 2 ]

0 голосов
/ 04 ноября 2018

После более продолжительной работы с kafka кажется, что api-узел kafka имеет следующее поведение (которое, я думаю, на самом деле происходит от самой kafka).

Когда сообщения запрашиваются до highWaterOff, тогда только сообщения поднимаютсяв highWaterOffset возвращаются в ConsumerGroup.Это имеет смысл, если сообщения не были реплицированы, потому что другой потребитель в группе не обязательно будет видеть эти сообщения.

Все еще возможно запрашивать и получать сообщения за пределами highWaterOffset, используя Consumer, а не ConsumerGroup, изапрос определенного раздела.

Кроме того, событие 'done', похоже, срабатывает, когда смещение не обязательно в последнемOffset.В этом случае необходимо отправить дополнительный запрос по адресу message.offset + 1.Если вы продолжите делать это, вы можете получить все сообщения до последней версии Offset.

Мне не понятно, почему у kafka такое поведение, но, возможно, есть некоторая детализация более низкого уровня, которая отображает это возникающее поведение.

0 голосов
/ 26 августа 2018

Каким-то образом уплотнение не позволяет потребителю видеть последние сообщения.

Да, вам не хватает нескольких сообщений, но вы также видите и другие.

Сжатие удаляет предыдущие ключи.

Обратите внимание, что вообще нет url - 1 значений

Key=key2 value={"name":"key2","url":"2"}
Key=key3 value={"name":"key3","url":"2"}
Key=key1 value={"name":"key1","url":"3"}
Key=key value={"name":"key","url":"3"}

Это потому, что вы отправили новые значения для того же ключа.

И вы отправили 10 сообщений, поэтому максимальный сдвиг по теме составляет 10

Ваш код не обязательно выглядит неправильно, но вам нужно иметь еще два значения.Смещения, которые печатаются, соответствуют этой логике.

<code><strike>key  - 1 | 0</strike>
<strike>key2 - 1 | 1</strike>
<strike>key3 - 1 | 2</strike>
<strike>key  - 2 | 3</strike>
<strike>key2 - 2 | 4</strike>
<strike>key3 - 2 | 5</strike>
key1 - 3 | 6
key  - 3 | 7
key2 - 3 | 8
key3 - 3 | 9

Как правило, я бы предложил, чтобы Kafka не пытался сжимать тему и записывать сегменты журнала 10 раз в секунду, а также использовать различные библиотеки, такие как node-rdkafka

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...