Без дополнительного развертывания вы можете использовать Apache Spark - Apache HBase Connector .
Сначала вы должны включить пакет.Это можно сделать с помощью следующих параметров *
spark.jars.packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11
spark.jars.repositories http://repo.hortonworks.com/content/groups/public/
в вашем spark-defaults.conf
или эквивалентных аргументах командной строки для spark-submit
/ SparkR
--packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 \
--repositories http://repo.hortonworks.com/content/groups/public/
Версия (s_2.11
выше) пакета должен соответствовать версии Scala, используемой для сборки Spark.
Теперь предположим, что вы определили свою таблицу как
create 'FooBar', 'Foo', 'Bar'
и хотите, чтобы вставка SparkR была эквивалентна:
put 'FooBar', '1000', 'Foo:Value', 'x1'
put 'FooBar', '1000', 'Bar:Value', 'y1'
put 'FooBar', '2000', 'Foo:Value', 'x2'
put 'FooBar', '2000', 'Bar:Value', 'y2'
Необходимо указать отображение каталога:
catalog = '{
"table":{"namespace":"default", "name":"FooBar"},
"rowkey":"key",
"columns":{
"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
"foo_value":{"cf":"Foo", "col":"Value", "type":"string"},
"bar_value":{"cf":"Bar", "col":"Value", "type":"string"}
}
}'
и таблицу ввода:
df <- createDataFrame(data.frame(
rowkey = c("1000", "2000"), foo_value = c("x1", "x2"), bar_value = c("y1", "y2")
))
Наконец, вы можете применить write.ml
со следующими параметрами:
write.df(df,
source = "org.apache.spark.sql.execution.datasources.hbase",
mode = "append", catalog = catalog)
Подробнее см. В официальном документе о соединителе .
Если вы не возражаете против дополнительных зависимостей, вы можете развернуть Apache Phoenix сопоставьте свои таблицы HBase (проверьте, например, PHOENIX-447 ), а затем используйте официальный соединитель или встроенный источник JDBC для записи ваших данных.
За дополнительную плату это обеспечит гораздо лучший пользовательский опыт.Например, если вы определили таблицу Phoenix как:
CREATE TABLE foobar (
id VARCHAR NOT NULL PRIMARY KEY,
foo INTEGER,
bar VARCHAR
);
, вы могли бы
SparkR:::callJStatic(
"java.lang.Class", "forName",
"org.apache.phoenix.jdbc.PhoenixDriver"
)
df <- createDataFrame(data.frame(
id = c("1000", "2000"), foo = c(1, 2), bar = c("x", "y")
))
write.df(
dfr, source = "org.apache.phoenix.spark",
# Note that the only supported mode is `overwrite`,
# which in fact works like `UPSERT`
mode = "overwrite",
table = "FooBar",
# ZooKeeper URL
zkUrl = "host:port"
)
Подобно первому варианту, вы должны будете включить соответствующий разъем .Однако, в отличие от разъема HBase, он не является самодостаточным и требует использования jar-файлов Phoenix Core и Client на CLASSPATH
.
* Не забудьте настроить версию пакета в будущем.