Агрегация занимает слишком много времени с использованием spark over hbase (используя shc 1.1.1-2.1-s_2.11) - PullRequest
0 голосов
/ 23 октября 2018

Я принимаю потоковые данные в hbase.Я предварительно разделил таблицу HBase по разделам Kafka.Используя составной rowkey с комбинацией разделов Kafka, отметка времени и другие столбцы делают его уникальным.При таком подходе у меня действительно хорошая пропускная способность при вставке данных, но я также должен агрегировать данные ежедневно, что очень медленно.

Мы заметили, что количество задач Spark, запущенных для groupByили count равно общему количеству регионов, в которых распространяется моя таблица.

Я что-то здесь не так делаю?Как я могу ограничить количество областей для таблицы в HBase?

Инструкция создания HBase

create 'default:test', {NAME => 'data', VERSIONS => 1, TTL => '3888000'},{SPLITS=> ['10000000000000000000000000000000','20000000000000000000000000000000','30000000000000000000000000000000','40000000000000000000000000000000','50000000000000000000000000000000','60000000000000000000000000000000','70000000000000000000000000000000','80000000000000000000000000000000','90000000000000000000000000000000']}

каталог при вставке

def catalog = s"""{
  |"table":{"namespace":"default", "name": "test", "tableCoder":"Phoenix"},
  |"rowkey":"key",
  |"columns":{
  |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
  |"resource_id":{"cf":"data", "col":"resource_id", "type":"string"},
  |"resource_name":{"cf":"data", "col":"resource_name", "type":"string"},
  |"parent_id":{"cf":"data", "col":"parent_id", "type":"string"},  
  |"parent_name":{"cf":"data", "col":"parent_name", "type":"string"},
  |"id":{"cf":"data", "col":"id", "type":"string"},
  |"name":{"cf":"data", "col":"name", "type":"string"},
  |"timestamp":{"cf":"data", "col":"timestamp", "type":"string"},
  |"readable_timestamp":{"cf":"data", "col":"readable_timestamp", 
    "type":"string"},
  |"value":{"cf":"data", "col":"value", "type":"string"},
  |"partition":{"cf":"data", "col":"partition", "type":"string"}
  |}
|}""".stripMargin

Каталог для чтения

def catalog = s"""{
    |"table":{"namespace":"default", "name": "test", "tableCoder":"Phoenix"},
    |"rowkey":"partition:timestamp:id:parent_id:resource_id",
    |"columns":{   
    |"partition":{"cf":"rowkey", "col":"partition", "type":"string"},
    |"timestamp":{"cf":"rowkey", "col":"timestamp", "type":"string"},
    |"id":{"cf":"rowkey", "col":"id", "type":"string"},
    |"parent_id":{"cf":"rowkey", "col":"parent_id", "type":"string"},
    |"resource_id":{"cf":"rowkey", "col":"resource_id", "type":"string"},
    |"resource_name":{"cf":"data", "col":"resource_name", "type":"string"}, 
    |"parent_name":{"cf":"data", "col":"parent_name", "type":"string"},
    |"name":{"cf":"data", "col":"name", "type":"string"},
    |"value":{"cf":"data", "col":"value", "type":"string"},
    |"readable_timestamp":{"cf":"data", "col":"readable_timestamp", "type":"string"}
    |}
    |}""".stripMargin

Для сканирования диапазона я использую все разделы и временной диапазон:

val endrow = "1540080000000"
val startrow = "1539993600000"

df.filter(($"partition"==="0" && ($"timestamp" >= startrow && $"timestamp" <= endrow)) ||($"partition"==="1" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="2" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="3" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="4" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="5" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="6" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="7" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="8" && ($"timestamp" >= startrow && $"timestamp" <= endrow))||($"partition"==="9" && ($"timestamp" >= startrow && $"timestamp" <= endrow))).count

Приведенный выше фильтр приводит к 830 задачам, что равно количеству регионов.Который занимает слишком много времени.Как я могу улучшить это?

...