Как масштабировать мое приложение от Kafka до Hbase, используя Spark с драйвером Phoenix? - PullRequest
0 голосов
/ 17 апреля 2019

В настоящее время я использую драйвер Phoenix для HBASE, используя искру от Kafka Я пишу в соленый стол с составными ключами строк, имеющими следующую структуру

[val_str0, val_str1, val_str2, val_str3, val_str4, val_str5, val_str6, val_str7, val_str8, отметка времени, значение]

Я пытаюсь ускорить мой приемный конвейер. В настоящее время запись в HBase является самой медленной частью.

Для теста я использовал 10 000 000 записей

Я пробовал разные интервалы между партиями от 45 до 120 секунд. Время обработки увеличивается с увеличением количества пакетов (имеет смысл, потому что потоковые данные больше для больших пакетов)

  • Дело 1
    Spark Submit Автономный
    Тема - 4 раздела -> ~ 6,6 мин
    Количество исполнителей = 4, количество ядер на исполнителя = 1
  • Дело 2 Кластер пряжи
    Тема - 8 разделов -> ~ 5,8 мин.
    Количество исполнителей = 8, количество ядер на исполнителя = 1
  • Дело 3
    Кластер пряжи
    Тема - 12 разделов -> ~ 6,2 минут
    Количество исполнителей = 12, количество ядер на исполнителя = 1

Я хочу иметь возможность сократить это время до 2,5 минут (скорость приема внутрь). Есть ли способ?
Будет ли лучше иметь 2 темы с отдельными заданиями потоковой передачи для каждой темы?
Также будет ли HBaseConnecter поглощать слияние быстрее, чем мой нынешний способ?

def main(args: Array[String]) {
  val logger = LoggerFactory.getLogger(getClass);
  val sc = new SparkContext(new SparkConf().setAppName("KafkaSparkPheonix"))
  val ssc = new StreamingContext(sc, Seconds(90))
  val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXXXXXXXXXX", "group.id" -> "kafka_phoenix")
  val topics = Array("Ingest").toSet
  val directKafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)


  directKafkaStream.foreachRDD(
    rdd => {
      rdd.foreachPartition(rows => {
        val phoenixConn = new MyPhoenixConnection();
        rows.foreach(data => {
          val record = parse(data._2)
          phoenixConn.writeData(record) //I am using 25000 record batches to write here
        })
        if(phoenixConn.count>0){
          phoenixConn.executeAndCommit()
        }
        phoenixConn.closeConnection()
      })
    }
  )

Спасибо за ваши ответы!

...