Как вставить данные из SparkR в Hbase в кластере hadoop - PullRequest
0 голосов
/ 18 октября 2018

Я ищу помощь в получении данных SparkR для прямой загрузки в HBase.Функция чтения работает, когда я могу читать данные из внешних таблиц Hive, используя SparkR (sparkR.session)

Выполнены шаги:

  • Создана таблица HBase (hbase_test1)
  • Создание внешней таблицы в Hive для сопоставления таблицы HBase в Hive (test1)

Код:

library(SparkR)

sc <- sparkR.session(master = "local",sparkEnvir = list(spark.driver.memory="2g",enableHiveSupport=TRUE))
sqlContext <- sparkR.session(sc)

df <- sql("show tables")
collect(df)

sdf <- sql("SELECT * from test1")

Здесь я стою.

Могу ли я записать данные напрямую в HBase напрямую из SparkR?К вашему сведению: мне нужно использовать SparkR для определенного кода ML.Результаты должны быть сохранены обратно в HBase.Обратите внимание, что я использую все инструменты с открытым исходным кодом.

1 Ответ

0 голосов
/ 19 октября 2018

Без дополнительного развертывания вы можете использовать Apache Spark - Apache HBase Connector .

Сначала вы должны включить пакет.Это можно сделать с помощью следующих параметров *

spark.jars.packages  com.hortonworks:shc-core:1.1.1-2.1-s_2.11
spark.jars.repositories http://repo.hortonworks.com/content/groups/public/

в вашем spark-defaults.conf или эквивалентных аргументах командной строки для spark-submit / SparkR

--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 \
--repositories http://repo.hortonworks.com/content/groups/public/ 

Версия (s_2.11 выше) пакета должен соответствовать версии Scala, используемой для сборки Spark.

Теперь предположим, что вы определили свою таблицу как

create 'FooBar', 'Foo', 'Bar'

и хотите, чтобы вставка SparkR была эквивалентна:

put 'FooBar', '1000', 'Foo:Value', 'x1'
put 'FooBar', '1000', 'Bar:Value', 'y1'
put 'FooBar', '2000', 'Foo:Value', 'x2'
put 'FooBar', '2000', 'Bar:Value', 'y2'

Необходимо указать отображение каталога:

catalog = '{
  "table":{"namespace":"default", "name":"FooBar"},
  "rowkey":"key",
  "columns":{
  "rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
  "foo_value":{"cf":"Foo", "col":"Value", "type":"string"},
  "bar_value":{"cf":"Bar", "col":"Value", "type":"string"}
  }
}'

и таблицу ввода:

df <- createDataFrame(data.frame(
  rowkey = c("1000", "2000"), foo_value = c("x1", "x2"), bar_value = c("y1", "y2")
))

Наконец, вы можете применить write.ml со следующими параметрами:

write.df(df, 
   source = "org.apache.spark.sql.execution.datasources.hbase", 
   mode = "append", catalog = catalog)

Подробнее см. В официальном документе о соединителе .

Если вы не возражаете против дополнительных зависимостей, вы можете развернуть Apache Phoenix сопоставьте свои таблицы HBase (проверьте, например, PHOENIX-447 ), а затем используйте официальный соединитель или встроенный источник JDBC для записи ваших данных.

За дополнительную плату это обеспечит гораздо лучший пользовательский опыт.Например, если вы определили таблицу Phoenix как:

CREATE TABLE foobar (
  id VARCHAR NOT NULL PRIMARY KEY, 
  foo INTEGER, 
  bar VARCHAR
); 

, вы могли бы

SparkR:::callJStatic(
  "java.lang.Class", "forName",  
  "org.apache.phoenix.jdbc.PhoenixDriver"
)


df <- createDataFrame(data.frame(
  id = c("1000", "2000"), foo = c(1, 2), bar = c("x", "y")
))


write.df(
  dfr, source = "org.apache.phoenix.spark", 
  # Note that the only supported mode is `overwrite`, 
  # which in fact works like `UPSERT`
  mode = "overwrite",
  table = "FooBar", 
  # ZooKeeper URL
  zkUrl = "host:port"  
)

Подобно первому варианту, вы должны будете включить соответствующий разъем .Однако, в отличие от разъема HBase, он не является самодостаточным и требует использования jar-файлов Phoenix Core и Client на CLASSPATH.


* Не забудьте настроить версию пакета в будущем.

...