Ускорение топологии шторма - PullRequest
0 голосов
/ 25 декабря 2018

У нас есть приложение, которое перемещает CSV-файлы из HDFS в Hive.Мы используем Storm Topology для этого процесса.

8 машин были использованы.Каждое из них имеет 22 ядра и 512 ГБ оперативной памяти.Тем не менее, наш код работает очень медленно.Для передачи 6 миллионов данных требуется 10 минут.

10 МБ из 60 файлов передаются в HDFS за одну секунду.Мы пытаемся оптимизировать наш код, но очевидно, что мы делаем что-то очень неправильное.

Для таблицы Hive у нас есть 64 сегмента.

В нашей топологии у нас есть 1 Spout и2 болта.В основном наш Spout получает файл CSV, отправляет строки первому Bolt, который отвечает за анализ данных, а затем отправляет Bolt второму Bolt, который отвечает за процесс HDFS.

HDFS Spout;

HdfsSpout hdfsSpout = new HdfsSpout()
    .withOutputFields(TextFileReader.defaultFields)
    .setReaderType("text")
    .setHdfsUri(hdfsUri)
    .setSourceDir("/data/in")
    .setArchiveDir("/data/done")
    .setBadFilesDir("/data/bad")
    .setClocksInSync(true) // NTP installed on all hosts
    .setIgnoreSuffix("_COPYING_") 
// do not begin reading file until it is completely copied to HDFS
    .setMaxOutstanding(50_000);

Mapper;

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
    .withColumnFields(new Fields(TTDPIRecord.fieldsList))
    .withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));

Параметры Hive;

HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
    .withAutoCreatePartitions(true)
    .withHeartBeatInterval(3)
    .withCallTimeout(10_000) // default = 10.000
    .withTxnsPerBatch(2)
    .withBatchSize(50_000) 
// doing below because its affecting storm metrics most likely
    .withTickTupleInterval(1);

Config;

Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

Построитель топологий;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");

Мы не уверены со следующими параметрами:

в HDFS Spout;.setMaxOutstanding (50_000);

в опциях Hive Spout;.withTxnsPerBatch (2) .withBatchSize (50_000) .withTickTupleInterval (1);

в Config;.setNumWorkers (6);.setNumAckers (6);

Параллельность в носике и болте;Мы дали 8 для каждого.

Какими должны быть значения для этих параметров?Заранее спасибо.

Редактировать;Вот наш результат теста для 10 МБ из 100 CSV-файлов;

hdfsSpout Исполнители: 8 Полная латентность: 1834,209 мс

recordParserBolt Исполнители: 8 полная задержка: 0,019 мс

hiveBolt Исполнители: 8 полная задержка: 1092,624 мс

1 Ответ

0 голосов
/ 30 декабря 2018

Вы делаете conf.setNumWorkers(6);, что означает, что вы используете только 6 из 8 машин, вы можете установить его на 8, чтобы использовать все имеющееся у вас оборудование.

Другой параметр, который вы можете изменить, - подсказка параллелизмаваших болтов, что означает начальный номер исполнителя (резьбы) компонента.Вы дали параллелизм только 8, вы можете увеличить его до 100/200 и посмотреть, как меняется производительность.

Вы можете пройти через это , чтобы понять, как параллелизм работает в шторм.

Можете ли вы также сказать, какова ваша конфигурация для max-spout-pending ?

...