У нас есть приложение, которое перемещает CSV-файлы из HDFS в Hive.Мы используем Storm Topology для этого процесса.
8 машин были использованы.Каждое из них имеет 22 ядра и 512 ГБ оперативной памяти.Тем не менее, наш код работает очень медленно.Для передачи 6 миллионов данных требуется 10 минут.
10 МБ из 60 файлов передаются в HDFS за одну секунду.Мы пытаемся оптимизировать наш код, но очевидно, что мы делаем что-то очень неправильное.
Для таблицы Hive у нас есть 64 сегмента.
В нашей топологии у нас есть 1 Spout и2 болта.В основном наш Spout получает файл CSV, отправляет строки первому Bolt, который отвечает за анализ данных, а затем отправляет Bolt второму Bolt, который отвечает за процесс HDFS.
HDFS Spout;
HdfsSpout hdfsSpout = new HdfsSpout()
.withOutputFields(TextFileReader.defaultFields)
.setReaderType("text")
.setHdfsUri(hdfsUri)
.setSourceDir("/data/in")
.setArchiveDir("/data/done")
.setBadFilesDir("/data/bad")
.setClocksInSync(true) // NTP installed on all hosts
.setIgnoreSuffix("_COPYING_")
// do not begin reading file until it is completely copied to HDFS
.setMaxOutstanding(50_000);
Mapper;
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(TTDPIRecord.fieldsList))
.withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));
Параметры Hive;
HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
.withAutoCreatePartitions(true)
.withHeartBeatInterval(3)
.withCallTimeout(10_000) // default = 10.000
.withTxnsPerBatch(2)
.withBatchSize(50_000)
// doing below because its affecting storm metrics most likely
.withTickTupleInterval(1);
Config;
Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
Построитель топологий;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");
Мы не уверены со следующими параметрами:
в HDFS Spout;.setMaxOutstanding (50_000);
в опциях Hive Spout;.withTxnsPerBatch (2) .withBatchSize (50_000) .withTickTupleInterval (1);
в Config;.setNumWorkers (6);.setNumAckers (6);
Параллельность в носике и болте;Мы дали 8 для каждого.
Какими должны быть значения для этих параметров?Заранее спасибо.
Редактировать;Вот наш результат теста для 10 МБ из 100 CSV-файлов;
hdfsSpout Исполнители: 8 Полная латентность: 1834,209 мс
recordParserBolt Исполнители: 8 полная задержка: 0,019 мс
hiveBolt Исполнители: 8 полная задержка: 1092,624 мс