Storm + Kafka не распараллеливается, как ожидалось - PullRequest
0 голосов
/ 08 марта 2019

У нас проблема с параллелизмом задач внутри одной топологии. Нам не удается получить хорошую, свободную скорость обработки.

Мы используем Kafka и Storm для построения системы с различными топологиями, где данные обрабатываются в соответствии с цепочкой топологий, связанных с использованием тем Kafka.

Мы используем Kafka 1.0.0 и Storm 1.2.1.

Загрузка небольшого количества сообщений, около 2000 в день, однако каждая задача может занять довольно много времени. В частности, одна топология может занимать различное время для выполнения каждой задачи, обычно от 1 до 20 минут. При последовательной обработке пропускной способности недостаточно для обработки всех входящих сообщений. Все топологии и система Kafka установлены на одном компьютере (16 ядер, 16 ГБ ОЗУ).

Поскольку сообщения независимы и могут обрабатываться параллельно, мы пытаемся использовать параллельные возможности Storm для повышения пропускной способности.

Для этого топология была настроена следующим образом:

  • 4 рабочих
  • Подсказка параллелизма установлена ​​на 10
  • Размер сообщения при чтении из Кафки достаточно большой, чтобы прочитать около 8 задач в каждом сообщении.
  • В темах Kafka используется коэффициент репликации = 1 и разделы = 10.

При такой конфигурации мы наблюдаем следующее поведение в этой топологии.

  • Около 7-8 задач читаются в одной партии из Kafka по топологии Storm (размер задачи не фиксирован), максимальный размер сообщения 128 кБ.
  • Около 4-5 заданий вычисляются одновременно. Работа более или менее равномерно распределяется между работниками. Некоторые рабочие берут 1 задание, другие берут 2 и обрабатывают их одновременно.
  • Когда задачи завершаются, остальные задачи запускаются.
  • Проблема с голоданием достигается, когда остается обработать только 1-2 задачи. Другие работники ждут простоя, пока все задачи не будут завершены.
  • Когда все задачи завершены, сообщение подтверждается и отправляется в следующую топологию.
  • Новая партия считывается с Kafka, и процесс начинается снова.

У нас есть два основных вопроса. Во-первых, даже с 4 работниками и 10 подсказками о параллелизме запускается только 4-5 задач. Во-вторых, больше не запускаются пакеты, пока идет работа, даже если это всего одна задача.

Проблема не в том, что у нас недостаточно работы, так как мы пытались вставить 2000 задач в начале, так что работы много.

Мы попытались увеличить параметр «maxSpoutsPending», ожидая, что топология будет считывать больше пакетов и ставить их в очередь в одно и то же время, но кажется, что они передаются внутри конвейера, а не обрабатываются одновременно.

Топология создается с использованием следующего кода:

private static StormTopology buildTopologyOD() {
    //This is the marker interface BrokerHosts.
    BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
    TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));

    tridentConfigCorrelation.scheme = new RawMultiScheme();
    tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));

    OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);

    TridentTopology topology = new TridentTopology();

    Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
            .shuffle()
            .each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
            .parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));

    //Create a state Factory to produce outputs to kafka topics.
    TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
            .withProducerProperties(kafkaProperties)
            .withKafkaTopicSelector(new ODTopicSelector())
            .withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());

    existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));

    return topology.build();
}

и конфиг, созданный как:

private static Config createConfig(boolean local) {
    Config conf = new Config();
    conf.setMaxSpoutPending(1); // Also tried 2..6
    conf.setNumWorkers(4);

    return conf;
}

Есть ли что-то, что мы можем сделать для повышения производительности, увеличивая количество параллельных задач или / и избегая голодания при завершении обработки пакета?

1 Ответ

0 голосов
/ 08 марта 2019

Я нашел старый пост от пользователей шторма от Натана Марца о настройке параллелизма для Trident:

Я рекомендую использовать функцию "name" для именования частей вашего потокатак что пользовательский интерфейс показывает, какие болты соответствуют каким разделам.

Trident объединяет операции в как можно меньшее количество болтов.Кроме того, он никогда не перераспределяет ваш поток, если вы не выполнили операцию, которая явно включает перераспределение (например, shuffle, groupBy, partitionBy, глобальное агрегирование и т. Д.).Это свойство Trident гарантирует, что вы можете контролировать порядок или порядок упорядочения вещей.Таким образом, в этом случае все до groupBy должно иметь такой же параллелизм, иначе Trident должен был бы перераспределить поток.И поскольку вы не сказали, что хотите перераспределить поток, он не может этого сделать.Вы можете получить различный параллелизм для носика в сравнении с последующим, введя операцию перераспределения, например:

stream.parallelismHint (1) .shuffle (). Each (…) .each (…).parallelismHint (3) .groupBy (…);

Я думаю, что вы, возможно, захотите установить параллелизм для вашего излива, а также для .each.

Что касается одновременной обработки нескольких пакетов,Вы правы в том, что именно для этого в Трайденте maxSpoutPending.Попробуйте проверить в интерфейсе Storm, что ваше максимальное ожидаемое значение расхода действительно получено.Также попробуйте включить ведение журнала отладки для MasterBatchCoordinator .По этим журналам вы сможете определить, находятся ли несколько партий в полете одновременно или нет.

Когда вы говорите, что несколько партий не обрабатываются одновременно, вы подразумеваете под ProcessTask?Имейте в виду, что одним из свойств Trident является то, что обновления состояний упорядочиваются в отношении пакетов.Если у вас есть, например, maxSpoutPending = 3 и пакет 1, 2 и 3 в полете, Trident не будет отправлять больше пакетов для обработки до тех пор, пока не будет записан пакет 1, после чего он отправит еще один.Таким образом, медленные партии могут блокировать испускание большего количества, даже если 2 и 3 полностью обработаны, они должны ждать 1, чтобы закончить и записать.

Если вам не нужно поведение Trident в пакетировании и упорядочении, вывместо этого можно попробовать обычный Storm.

Еще одна дополнительная заметка, но вы можете рассмотреть возможность перехода с storm-kafka на storm-kafka-client.Для этого вопроса это не важно, но вы не сможете выполнить обновление до Kafka 2.x, не выполнив этого, и вам будет проще выполнить миграцию состояния.

...