У нас проблема с параллелизмом задач внутри одной топологии. Нам не удается получить хорошую, свободную скорость обработки.
Мы используем Kafka и Storm для построения системы с различными топологиями, где данные обрабатываются в соответствии с цепочкой топологий, связанных с использованием тем Kafka.
Мы используем Kafka 1.0.0 и Storm 1.2.1.
Загрузка небольшого количества сообщений, около 2000 в день, однако каждая задача может занять довольно много времени. В частности, одна топология может занимать различное время для выполнения каждой задачи, обычно от 1 до 20 минут. При последовательной обработке пропускной способности недостаточно для обработки всех входящих сообщений. Все топологии и система Kafka установлены на одном компьютере (16 ядер, 16 ГБ ОЗУ).
Поскольку сообщения независимы и могут обрабатываться параллельно, мы пытаемся использовать параллельные возможности Storm для повышения пропускной способности.
Для этого топология была настроена следующим образом:
- 4 рабочих
- Подсказка параллелизма установлена на 10
- Размер сообщения при чтении из Кафки достаточно большой, чтобы прочитать около 8 задач в каждом сообщении.
- В темах Kafka используется коэффициент репликации = 1 и разделы = 10.
При такой конфигурации мы наблюдаем следующее поведение в этой топологии.
- Около 7-8 задач читаются в одной партии из Kafka по топологии Storm (размер задачи не фиксирован), максимальный размер сообщения 128 кБ.
- Около 4-5 заданий вычисляются одновременно. Работа более или менее равномерно распределяется между работниками. Некоторые рабочие берут 1 задание, другие берут 2 и обрабатывают их одновременно.
- Когда задачи завершаются, остальные задачи запускаются.
- Проблема с голоданием достигается, когда остается обработать только 1-2 задачи. Другие работники ждут простоя, пока все задачи не будут завершены.
- Когда все задачи завершены, сообщение подтверждается и отправляется в следующую топологию.
- Новая партия считывается с Kafka, и процесс начинается снова.
У нас есть два основных вопроса. Во-первых, даже с 4 работниками и 10 подсказками о параллелизме запускается только 4-5 задач. Во-вторых, больше не запускаются пакеты, пока идет работа, даже если это всего одна задача.
Проблема не в том, что у нас недостаточно работы, так как мы пытались вставить 2000 задач в начале, так что работы много.
Мы попытались увеличить параметр «maxSpoutsPending», ожидая, что топология будет считывать больше пакетов и ставить их в очередь в одно и то же время, но кажется, что они передаются внутри конвейера, а не обрабатываются одновременно.
Топология создается с использованием следующего кода:
private static StormTopology buildTopologyOD() {
//This is the marker interface BrokerHosts.
BrokerHosts hosts = new ZkHosts(configuration.getProperty(ZKHOSTS));
TridentKafkaConfig tridentConfigCorrelation = new TridentKafkaConfig(hosts, configuration.getProperty(TOPIC_FROM_CORRELATOR_NAME));
tridentConfigCorrelation.scheme = new RawMultiScheme();
tridentConfigCorrelation.fetchSizeBytes = Integer.parseInt(configuration.getProperty(MAX_SIZE_BYTES_CORRELATED_STREAM));
OpaqueTridentKafkaSpout spoutCorrelator = new OpaqueTridentKafkaSpout(tridentConfigCorrelation);
TridentTopology topology = new TridentTopology();
Stream existingObject = topology.newStream("kafka_spout_od1", spoutCorrelator)
.shuffle()
.each(new Fields("bytes"), new ProcessTask(), new Fields(RESULT_FIELD, OBJECT_FIELD))
.parallelismHint(Integer.parseInt(configuration.getProperty(PARALLELISM_HINT)));
//Create a state Factory to produce outputs to kafka topics.
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(kafkaProperties)
.withKafkaTopicSelector(new ODTopicSelector())
.withTridentTupleToKafkaMapper(new ODTupleToKafkaMapper());
existingObject.partitionPersist(stateFactory, new Fields(RESULT_FIELD, OBJECT_FIELD), new TridentKafkaUpdater(), new Fields(OBJECT_FIELD));
return topology.build();
}
и конфиг, созданный как:
private static Config createConfig(boolean local) {
Config conf = new Config();
conf.setMaxSpoutPending(1); // Also tried 2..6
conf.setNumWorkers(4);
return conf;
}
Есть ли что-то, что мы можем сделать для повышения производительности, увеличивая количество параллельных задач или / и избегая голодания при завершении обработки пакета?