Как прочитать все данные из семейства столбцов, используя scala - PullRequest
0 голосов
/ 09 ноября 2019

Я новичок в Scala и hbase. Моя цель - прочитать все данные из определенного семейства столбцов из hbase, чтобы выполнить некоторую подготовку данных для будущего использования машинного обучения.

Чтобы сделать это и, как я уже говорил, «Я новичок», мой первый шаг -читать данные из hbase. Мне просто удалось прочитать данные из заданной строки, семейства столбцов и столбца. Это мой код, возможно, он хорошо поможет любому его использовать.

Теперь мой второй шаг - прочитать все данные из данного семейства столбцов. Я только что попробовал несколько примеров, но никто не работает

//Code to retrieve data from a given row, CF and column, this code works
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
val conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum","hadoop-master")
//conf.set("hbase.zookeeper.property.clientPort","16010")
val connection = ConnectionFactory.createConnection(conf)
val table =connection.getTable(TableName.valueOf("mimic3"))
val row  = Bytes.toBytes("10150") // 1 is the row id
val cf = Bytes.toBytes("sepsiscategories")
val c = Bytes.toBytes("intime")
val query = new Get(row)
val res = table.get(query)
res.getValue(cf,c)
Bytes.toString(res.getValue(cf,c))
//this query works well , now I will try to retreive all records             from HBase and I will put them on an RDD

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.fs.{FileSystem,FSDataInputStream,Path}
import java.net.URI
import java.io.File
import java.util.Properties
import java.sql.DriverManager
import org.apache.spark.sql.{Row,SaveMode}
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.conf.Configuration._
import spark.implicits._
import spark.sql

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val mimic_table_conf = HBaseConfiguration.create();
mimic_table_conf.set(TableInputFormat.INPUT_TABLE,"mimic3")
mimic_table_conf.set("hbase.zookeeper.quorum","hadoop-master")
mimic_table_conf.set("hbase.zookeeper.property.clientPort","16010")
val mimic_PatternsFromHbase = spark.sparkContext.newAPIHadoopRDD(mimic_table_conf,classOf[TableInputFormat],classOf[ImmutableBytesWritable], classOf[Result])

val sepsiscategories = mimic_PatternsFromHbase.mapPartitions(f=>       f.map(row1 =>    (Bytes.toString(row1._2.getRow),Bytes.toString(row1._2.getValue(Bytes.to    Bytes("sepsiscategories"),Bytes.toBytes("admissiontype")))))).toDF("id",    "admissiontype")
sepsiscategories.createOrReplaceTempView("sep_categories")
spark.sql("select * from sep_categories").show

Я просто ожидаю увидеть результат, но у меня просто есть эта ошибка:

19/11/09 12:18: 29 WARN zookeeper.ClientCnxn: Сессия 0x0 для сервера>> hadoop-master / 172.18.0.2: 16010, непредвиденная ошибка, закрытие сокета> соединения и попытка переподключения java.io.IOException: Пакет len1213486160 находится вне диапазона!

Может кто-нибудь предложить другое предложение, потому что мне не удалось решить эту проблему, я не могу найти файл zkCli.sh для обновления параметра -Djute.maxbuffer, как указано здесь: https://stackoverflow.com/a/19990613/5674606

PS: кластер, который я использую, основан на образе докера, это ссылка https://kiwenlau.com/2016/06/26/hadoop-cluster-docker-update-english/

...