Наша топология использует KafkaSpout
для извлечения сообщений из тем Кафки.У нас есть ~ 150 тем с ~ 12 разделами, 8 исполнителями шторма и задачами на 2 узлах шторма.Storm версии 1.0.5, брокеры Kafka версии 10.0.2, клиенты Kafka версии 0.9.0.1.Мы не удаляем темы Кафки.
В какой-то момент я наблюдал огромное количество повторяющихся предупреждений WARN в worker.log
2018-05-29 14: 36: 57.928 oaskKafkaUtilsThread-15-kafka-spout-executor [18 18] [WARN] Раздел {host1: 9092, topic = topic_1, partition = 10} Получил запрос на выборку со смещением вне диапазона: [9248]
2018-05-29 14: 36: 57.929 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {host = host2: 9092, topic = topic_2, partition = 0} Получил запрос на выборку со смещением вне диапазона:[22650006]
2018-05-29 14: 36: 57.930 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {хост = хост3: 9092, тема = тема_3, раздел =4} Получил запрос на выборку со смещением вне диапазона: [1011584]
2018-05-29 14: 36: 57.932 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN] Partition {host1: 9092, topic = topic4, partition = 4} Получил запрос на выборку со смещением вне диапазона: [9266]
2018-05-29 14: 36: 57.933 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN] Раздел {хост = хост2: 9092, тема = тема5, раздел = 4} Получен запрос на выборку со смещением вне диапазона: [9266]
2018-05-29 14: 36: 57.935 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {host1: 9092, topic = topic6, partition = 4} Получил запрос на выборку со смещением вне диапазона: [1011584]
2018-05-29 14: 36: 57.936 oaskKafkaUtils Thread-15-kafka-spout-executor [18 18] [WARN] Раздел {host = host2: 9092, topic = topic6, partition = 10}Получил запрос на выборку со смещением вне диапазона: [9248]
По какой-то причине одно и то же значение смещения константы использовалось для одного и того же раздела разных тем.
Я включил режим отладки иболее точно наблюдаемые файлы журнала.
2018-05-29 14: 37: 03.573 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) вZK для раздела {host = host3: 9092, topic = topic1, partition = 8} для топологии: topology1
2018-05-29 14:37:03.577 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) в ZK для раздела {host = host1: 9092, topic = topic2, partition = 8} для топологии: topology1
2018-05-29 14: 37: 03.578 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) в ZK для раздела {host = host2: 9092,topic = topic3, partition = 8} для топологии: topology1
2018-05-29 14: 38: 07.581 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последнее выполненное смещение (61292573) в ZK для раздела {host = host1: 9092, topic = topic4, partition = 8} для топологии: топология1
2018-05-29 14: 38: 07.582 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последнее завершенное смещение (61292573) в ZK для раздела {host = host2: 9092, topic = topic5, partition = 8} для топологии: topology1
2018-05-29 14: 38: 07.584 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последний cсмещение (61292573) в ZK для раздела {host = host3: 9092, topic = topic6, partition = 8} для топологии: топология1
Я заметил, что некоторая часть всех тем была разделена на две независимые группы.Каждая группа состояла из 31 темы.Все темы в каждой группе использовали одинаковое значение смещения для каждого раздела.Однако это значение не было постоянным и варьируется между 8 различными значениями.Каждое из этих 8 значений было правильным для определенной темы из группы.Более того, каждое из этих значений росло со временем, и все темы обновляли его синхронно.Большинство тем (55 из 62) из каждой группы имели соответствующее ПРЕДУПРЕЖДЕНИЕ «Смещение или диапазон», но с постоянным значением.Другие 7 тем продолжали работать корректно без сообщений WARNING, но их значение смещения также менялось.
Я просмотрел исходный код storm-kafka
и заметил, что флаг useStartOffsetTimeIfOffsetOutOfRange
в нашем случае не работаетпотому что у нас нет неудачных кортежей и смещение кафки меньше _emittedToOffset
.Таким образом, одно и то же сообщение WARN регистрируется снова и снова.
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
_pending.headMap(offset).clear();
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
return;
}
Однако я не понимаю, как возможно, что _emittedToOffset
получил одно и то же значение для разных тем.У вас, наверное, есть идеи, почему это может произойти?