В настоящее время я использую драйвер Phoenix для HBASE, используя искру от Kafka
Я пишу в соленый стол с составными ключами строк, имеющими следующую структуру
[val_str0, val_str1, val_str2, val_str3, val_str4, val_str5, val_str6,
val_str7, val_str8, отметка времени, значение]
Я пытаюсь ускорить мой приемный конвейер. В настоящее время запись в HBase является самой медленной частью.
Для теста я использовал 10 000 000 записей
Я пробовал разные интервалы между партиями от 45 до 120 секунд. Время обработки увеличивается с увеличением количества пакетов (имеет смысл, потому что потоковые данные больше для больших пакетов)
- Дело 1
Spark Submit Автономный
Тема - 4 раздела -> ~ 6,6 мин
Количество исполнителей = 4, количество ядер на исполнителя = 1
- Дело 2
Кластер пряжи
Тема - 8 разделов -> ~ 5,8 мин.
Количество исполнителей = 8, количество ядер на исполнителя = 1
- Дело 3
Кластер пряжи
Тема - 12 разделов -> ~ 6,2 минут
Количество исполнителей = 12, количество ядер на исполнителя = 1
Я хочу иметь возможность сократить это время до 2,5 минут (скорость приема внутрь). Есть ли способ?
Будет ли лучше иметь 2 темы с отдельными заданиями потоковой передачи для каждой темы?
Также будет ли HBaseConnecter поглощать слияние быстрее, чем мой нынешний способ?
def main(args: Array[String]) {
val logger = LoggerFactory.getLogger(getClass);
val sc = new SparkContext(new SparkConf().setAppName("KafkaSparkPheonix"))
val ssc = new StreamingContext(sc, Seconds(90))
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXXXXXXXXXX", "group.id" -> "kafka_phoenix")
val topics = Array("Ingest").toSet
val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
directKafkaStream.foreachRDD(
rdd => {
rdd.foreachPartition(rows => {
val phoenixConn = new MyPhoenixConnection();
rows.foreach(data => {
val record = parse(data._2)
phoenixConn.writeData(record) //I am using 25000 record batches to write here
})
if(phoenixConn.count>0){
phoenixConn.executeAndCommit()
}
phoenixConn.closeConnection()
})
}
)
Спасибо за ваши ответы!