Одинаковое значение смещения используется разными темами - PullRequest
0 голосов
/ 31 мая 2018

Наша топология использует KafkaSpout для извлечения сообщений из тем Кафки.У нас есть ~ 150 тем с ~ 12 разделами, 8 исполнителями шторма и задачами на 2 узлах шторма.Storm версии 1.0.5, брокеры Kafka версии 10.0.2, клиенты Kafka версии 0.9.0.1.Мы не удаляем темы Кафки.

В какой-то момент я наблюдал огромное количество повторяющихся предупреждений WARN в worker.log

2018-05-29 14: 36: 57.928 oaskKafkaUtilsThread-15-kafka-spout-executor [18 18] [WARN] Раздел {host1: 9092, topic = topic_1, partition = 10} Получил запрос на выборку со смещением вне диапазона: [9248]

2018-05-29 14: 36: 57.929 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {host = host2: 9092, topic = topic_2, partition = 0} Получил запрос на выборку со смещением вне диапазона:[22650006]

2018-05-29 14: 36: 57.930 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {хост = хост3: 9092, тема = тема_3, раздел =4} Получил запрос на выборку со смещением вне диапазона: [1011584]

2018-05-29 14: 36: 57.932 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN] Partition {host1: 9092, topic = topic4, partition = 4} Получил запрос на выборку со смещением вне диапазона: [9266]

2018-05-29 14: 36: 57.933 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN] Раздел {хост = хост2: 9092, тема = тема5, раздел = 4} Получен запрос на выборку со смещением вне диапазона: [9266]

2018-05-29 14: 36: 57.935 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN] Раздел {host1: 9092, topic = topic6, partition = 4} Получил запрос на выборку со смещением вне диапазона: [1011584]

2018-05-29 14: 36: 57.936 oaskKafkaUtils Thread-15-kafka-spout-executor [18 18] [WARN] Раздел {host = host2: 9092, topic = topic6, partition = 10}Получил запрос на выборку со смещением вне диапазона: [9248]

По какой-то причине одно и то же значение смещения константы использовалось для одного и того же раздела разных тем.

Я включил режим отладки иболее точно наблюдаемые файлы журнала.

2018-05-29 14: 37: 03.573 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) вZK для раздела {host = host3: 9092, topic = topic1, partition = 8} для топологии: topology1

2018-05-29 14:37:03.577 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) в ZK для раздела {host = host1: 9092, topic = topic2, partition = 8} для топологии: topology1

2018-05-29 14: 37: 03.578 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG] Написал последнее завершенное смещение (1572936) в ZK для раздела {host = host2: 9092,topic = topic3, partition = 8} для топологии: topology1

2018-05-29 14: 38: 07.581 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последнее выполненное смещение (61292573) в ZK для раздела {host = host1: 9092, topic = topic4, partition = 8} для топологии: топология1

2018-05-29 14: 38: 07.582 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последнее завершенное смещение (61292573) в ZK для раздела {host = host2: 9092, topic = topic5, partition = 8} для топологии: topology1

2018-05-29 14: 38: 07.584 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG] Написал последний cсмещение (61292573) в ZK для раздела {host = host3: 9092, topic = topic6, partition = 8} для топологии: топология1

Я заметил, что некоторая часть всех тем была разделена на две независимые группы.Каждая группа состояла из 31 темы.Все темы в каждой группе использовали одинаковое значение смещения для каждого раздела.Однако это значение не было постоянным и варьируется между 8 различными значениями.Каждое из этих 8 значений было правильным для определенной темы из группы.Более того, каждое из этих значений росло со временем, и все темы обновляли его синхронно.Большинство тем (55 из 62) из ​​каждой группы имели соответствующее ПРЕДУПРЕЖДЕНИЕ «Смещение или диапазон», но с постоянным значением.Другие 7 тем продолжали работать корректно без сообщений WARNING, но их значение смещения также менялось.

Я просмотрел исходный код storm-kafka и заметил, что флаг useStartOffsetTimeIfOffsetOutOfRange в нашем случае не работаетпотому что у нас нет неудачных кортежей и смещение кафки меньше _emittedToOffset.Таким образом, одно и то же сообщение WARN регистрируется снова и снова.

    } catch (TopicOffsetOutOfRangeException e) {
        offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
        // fetch failed, so don't update the fetch metrics

        //fix bug [STORM-643] : remove outdated failed offsets
        if (!processingNewTuples) {
            // For the case of EarliestTime it would be better to discard
            // all the failed offsets, that are earlier than actual EarliestTime
            // offset, since they are anyway not there.
            // These calls to broker API will be then saved.
            Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);

            // Omitted messages have not been acked and may be lost
            if (null != omitted) {
                _lostMessageCount.incrBy(omitted.size());
            }

            _pending.headMap(offset).clear();

            LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
        }

        if (offset > _emittedToOffset) {
            _lostMessageCount.incrBy(offset - _emittedToOffset);
            _emittedToOffset = offset;
            LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
        }

        return;
    }

Однако я не понимаю, как возможно, что _emittedToOffset получил одно и то же значение для разных тем.У вас, наверное, есть идеи, почему это может произойти?

1 Ответ

0 голосов
/ 20 июня 2018

В исходном коде storm-kafka есть ошибка, возникающая при сбое брокеров Kafka.Здесь соответствуют JIRA билет и запрос на получение с исправлением.

...