Я пытаюсь использовать kafka-node для чтения сжатых сообщений из темы kafka.
Проблема заключается в том, что недавно вставленные сообщения остаются выше EOL и недоступны до тех пор, пока не будут добавлены дополнительные сообщения.Фактически, между EOL и смещением высокой воды существует разрыв, который предотвращает чтение последних сообщений.Непонятно, почему это так.
Тема была создана с
kafka-topics.sh --zookeeper ${KAFKA_HOST}:2181 --create --topic atopic --config "cleanup.policy=compact" --config "delete.retention.ms=100" --config "segment.ms=100" --config "min.cleanable.dirty.ratio=0" --partitions 1 --replication-factor 1
В тему вводятся несколько ключевых значений.Некоторые ключи были одинаковыми.
var client = new kafka.KafkaClient({kafkaHost: "<host:port>",autoConnect: true})
var producer = new HighLevelProducer(client);
producer.send(payload, function(error, result) {
debug('Sent payload to Kafka: ', payload);
if (error) {
console.error(error);
} else {
res(true)
}
client.close()
});
});
Здесь вставлены ключи и значения
key - 1
key2 - 1
key3 - 1
key - 2
key2 - 2
key3 - 2
key1 - 3
key - 3
key2 - 3
key3 - 3
Затем был запрошен набор ключей темы.
var options = {
id: 'consumer1',
kafkaHost: "<host:port>",
groupId: "consumergroup1",
sessionTimeout: 15000,
protocol: ['roundrobin'],
fromOffset: 'earliest'
};
var consumerGroup = new ConsumerGroup(options, topic);
consumerGroup.on('error', onError);
consumerGroup.on('message', onMessage);
consumerGroup.on('done', function(message) {
consumerGroup.close(true,function(){ });
})
function onError (error) {
console.error(error);
}
function onMessage (message) {)
console.log('%s read msg Topic="%s" Partition=%s Offset=%d HW=%d', this.client.clientId, message.topic, message.partition, message.offset, message.highWaterOffset, message.value);
}
})
Результаты удивительны:
consumer1 read msg Topic="atopic" Partition=0 Offset=4 highWaterOffset=10 Key=key2 value={"name":"key2","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=5 highWaterOffset=10 Key=key3 value={"name":"key3","url":"2"}
consumer1 read msg Topic="atopic" Partition=0 Offset=6 highWaterOffset=10 Key=key1 value={"name":"key1","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=7 highWaterOffset=10 Key=key value={"name":"key","url":"3"}
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
consumer1 read msg Topic="atopic" Partition=0 Offset=0 highWaterOffset=10 Key= value=
Существует сильное смещение воды, которое представляет самое последнее значение 10. Однако значение смещения, которое видит потребитель, составляет только 7. Каким-то образомсжатие не позволяет потребителю видеть последние сообщения.
Непонятно, как избежать этого ограничения и разрешить потребителю видеть последние сообщения.
Любые предложения приветствуются.Спасибо.