Я действительно столкнулся с проблемой с моей таблицей HBase, которая развивается каждый день. Я использую ядро 10 shc для чтения таблицы в виде DataFrame и храню каждый столбец как тип STRING
. То, что я пытаюсь сделать, это читать каждый день только последние 30 дней данных. Учитывая, что таблица развивается каждый день, для загрузки всей таблицы и применения фильтров требуется более 6 часов.
Я использую shc версии 1.1.1-2.1-s_2.11
Что я делаю, это:
val df = withCatalog(catalog)
df.filter($"date > "20191201" && $"date < "20200101")
Когда я делаю explain()
, это показывает мне это на физическом плане
== Physical Plan ==
*Filter (isnotnull(ip#16) && (ip#16 >= 4F62B402))
+- *Filter <function1>.apply
+- *Scan HBaseRelation(Map(catalog ->....
Я не вижу PushedFilters: [...]
что означает, что нет предиката, который был передан вниз.
Кто-нибудь, пожалуйста, может помочь мне в этом вопросе?
РЕДАКТИРОВАТЬ после shay__ рекомендуя
Я добавил:
Logger.getLogger("org").setLevel(Level.DEBUG)
Logger.getLogger("akka").setLevel(Level.DEBUG)
И в моих журналах пряжи я получил:
DEBUG HBaseFilter: для всех фильтров:
DEBUG HBaseFilter: ret: DEBUG
HBaseFilter: start: None end: None
Это означает, что на основе приведенного ниже кода не было фильтров, переведенных в HbaseFilter, поэтому нет фильтров с понижением, хотя в физическом плане мы можем видеть что фильтр существует.
https://github.com/hortonworks-spark/shc/blob/76673b40b3ce370d0fbc4614938cbe54cfffa082/core/src/main/scala/org/apache/spark/sql/execution/datasources/hbase/HBaseFilter.scala#L127