Как отправить запрос DELETE на HBase через Spark Job - PullRequest
1 голос
/ 04 апреля 2019

У меня есть этот вариант использования для автоматизированного задания SparkSQL, где я хочу сделать это:

  1. Считать таблицу (назовем это table1) из Феникса, используя Spark, и собрать в DataFrame(давайте назовем это df1) все найденные отрицательные значения

  2. Затем я хочу удалить записи из другой таблицы (table2), где значения из столбца находятся в df1 (подумал об этомзапрос JOIN, но я хотел знать, возможно ли это с помощью DataFrame, и есть ли API, использующий HBase и Spark DataFrames)

  3. AFAIK Phoenix напрямую не поддерживает операции DELETE черезSpark (пожалуйста, исправьте меня, если я ошибаюсь, и если есть способ, который я с радостью хотел бы услышать об этом), поэтому я более склонен использовать HBase Spark API


Вот схема для более наглядного объяснения:

schema


Вот некоторый код.

Сбор отрицательных значений в DataFrame:

// Collect negative values
val negativeValues = spark
  .sqlContext
  .phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
  .select('COLUMN1)
  .where('COLUMN2.lt(0))

// Send the query
[...]

Удалить значения из таблицы 2, гдеCOLUMN1 имеет отрицательное значение, поэтому что-то вроде этого в SQL (и если возможно непосредственно применить IN к DF):

DELETE FROM table2 WHERE COLUMN1 IN negativeValues

Мой ожидаемый результат будет следующим:

table1

column1 |   column2
        |
123456  |   123
234567  |   456
345678  |   -789
456789  |   012
567891  |   -123



table2

column1 |   column2
        |
123456  |   321
234567  |   654
345678  |   945 <---- same column1 as table1's, so delete
456789  |   987
567891  |   675 <---- same column1 as table1's, so delete

Итак, в конечном итоге, я хотел бы знать, есть ли способ отправить этот запрос DELETE в HBase через Spark без особых хлопот.

Спасибо.

1 Ответ

0 голосов
/ 05 апреля 2019

вам нужно создать пользовательский API, если необходимо выполнить запрос «DELETE» от spark через Phoenix (sql engine) до Hbase.

Можно использовать следующий подход,

  1. получить столбец table2 rowkey из исходного кадра данных, чтобы выполнить удаление (для таблицы 2).
  2. создать код для работы с каждым разделом исходного фрейма данных и создать запрос «DELETE». скажем, запрос "DELETE FROM table2 WHERE column1 =?", подготовьте его и выполните как пакет с правильным размером пакета, который вы видите. так как мы выполняем его параллельно на каждом разделе фрейма данных, количество разделов в исходном фрейме данных приводит к параллелизму. так что вы можете попробовать переразбить его на нужный размер, чтобы увидеть правильные показатели производительности.

Если вы хотите пропустить движок sql, вы также можете использовать прямой API spark-hbase. Вот один такой пример - https://github.com/tmalaska/SparkOnHBase/blob/master/src/main/scala/org/apache/hadoop/hbase/spark/example/hbasecontext/HBaseBulkDeleteExample.scala

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...