У меня есть этот вариант использования для автоматизированного задания SparkSQL, где я хочу сделать это:
Считать таблицу (назовем это table1) из Феникса, используя Spark, и собрать в DataFrame(давайте назовем это df1) все найденные отрицательные значения
Затем я хочу удалить записи из другой таблицы (table2), где значения из столбца находятся в df1 (подумал об этомзапрос JOIN, но я хотел знать, возможно ли это с помощью DataFrame, и есть ли API, использующий HBase и Spark DataFrames)
AFAIK Phoenix напрямую не поддерживает операции DELETE черезSpark (пожалуйста, исправьте меня, если я ошибаюсь, и если есть способ, который я с радостью хотел бы услышать об этом), поэтому я более склонен использовать HBase Spark API
Вот схема для более наглядного объяснения:
Вот некоторый код.
Сбор отрицательных значений в DataFrame:
// Collect negative values
val negativeValues = spark
.sqlContext
.phoenixTableAsDataFrame("phoenix.table1", Seq(), conf = hbaseConf)
.select('COLUMN1)
.where('COLUMN2.lt(0))
// Send the query
[...]
Удалить значения из таблицы 2, гдеCOLUMN1 имеет отрицательное значение, поэтому что-то вроде этого в SQL (и если возможно непосредственно применить IN к DF):
DELETE FROM table2 WHERE COLUMN1 IN negativeValues
Мой ожидаемый результат будет следующим:
table1
column1 | column2
|
123456 | 123
234567 | 456
345678 | -789
456789 | 012
567891 | -123
table2
column1 | column2
|
123456 | 321
234567 | 654
345678 | 945 <---- same column1 as table1's, so delete
456789 | 987
567891 | 675 <---- same column1 as table1's, so delete
Итак, в конечном итоге, я хотел бы знать, есть ли способ отправить этот запрос DELETE в HBase через Spark без особых хлопот.
Спасибо.