Я создал Jar для своего проекта Scala, а затем выполнил следующую команду из терминала [com.sukrit.hbase_ "- это имя пакета, а" Filters_Usage "- это класс scala, который я хочу запустить]
Macintosh:bin sukritmehta$ ./spark-submit --class "com.sukrit.hbase_.Filters_Usage" --master local[*] "/Users/sukritmehta/Desktop/Sukrit/Spark_Hbase/target/Spark_Hbase-0.0.1-SNAPSHOT.jar"
Но после этого я сталкиваюсь со следующими ошибками:
20/04/24 20:53:01 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load class com.sukrit.hbase_.Filters_Usage.
20/04/24 20:53:02 INFO util.ShutdownHookManager: Shutdown hook called
20/04/24 20:53:02 INFO util.ShutdownHookManager: Deleting directory /private/var/folders/d4/psn4wv8s7tjbfgt6gkt35z9c0000gq/T/spark-ae120675-a1c6-4300-997c-bd53f9f35187
pom. xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark_Hbase</groupId>
<artifactId>Spark_Hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Scala and Spark dependencies -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!-- <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId>
<version>3.0.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId> <version>2.1.1</version>
</dependency> -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<!-- <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.4</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency> -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.0.0-alpha4</version> <!-- Hortonworks Latest -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Filters_Usage. scala
package com.sukrit.hbase_
import org.apache.hadoop.conf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.mapred.JobConf
import org.apache.spark._
import org.apache.log4j._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.Scan
import scala.collection.JavaConverters._
import org.apache.hadoop.hbase.filter._
object Filters_Usage {
def printRow(result : Result) = {
val cells = result.rawCells();
print( Bytes.toString(result.getRow) + " : " )
for(cell <- cells){
val col_name = Bytes.toString(CellUtil.cloneQualifier(cell))
val col_value = Bytes.toString(CellUtil.cloneValue(cell))
print("(%s:%s) ".format(col_name, col_value))
}
println()
}
def main(args : Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("Spark_Hbase_Connection").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set("hbase.rpc.timeout", "1800000");
conf.set("hbase.client.scanner.timeout.period", "1800000");
// establish a connection
val connection:Connection = ConnectionFactory.createConnection(conf)
// Table on which different commands have to be run.
val tableName = connection.getTable(TableName.valueOf("emp"))
sc.addJar("/Users/sukritmehta/Desktop/Sukrit/Spark_Hbase/target/Spark_Hbase-0.0.1-SNAPSHOT.jar")
println("\n **************** All rows in Table **************** \n ")
val scan:Scan = new Scan()
val resultScanner:ResultScanner = tableName.getScanner(scan)
resultScanner.asScala.foreach(result => {
printRow(result)
})
resultScanner.close()
println(" \n \n ")
// **************** Row Filters ****************
println("**************** Row Filter (for Row_keys <=5) **************** \n ")
val row_filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("5")));
scan.setFilter(row_filter1)
val resultScanner1:ResultScanner = tableName.getScanner(scan)
resultScanner1.asScala.foreach(result => {
printRow(result)
})
resultScanner1.close()
println(" \n \n ")
println("**************** Row Filter (for Row_keys starting with 1) **************** \n ")
val row_filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL,
new SubstringComparator("1"));
scan.setFilter(row_filter2)
val resultScanner2:ResultScanner = tableName.getScanner(scan)
resultScanner2.asScala.foreach(result => {
printRow(result)
})
resultScanner2.close()
println(" \n \n ")
// **************** Family Filters ****************
println("**************** Family Filter (for Col Family = personal data) **************** \n ")
val fam_filter1 = new FamilyFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("personal data")));
scan.setFilter(fam_filter1)
val resultScanner3:ResultScanner = tableName.getScanner(scan)
resultScanner3.asScala.foreach(result => {
printRow(result)
})
resultScanner3.close()
println(" \n \n ")
// **************** Qualifier Filters ****************
println("**************** Qualifier Filter (for Column = name) **************** \n ")
val qual_filter1 = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("name")));
scan.setFilter(qual_filter1)
val resultScanner4:ResultScanner = tableName.getScanner(scan)
resultScanner4.asScala.foreach(result => {
printRow(result)
})
resultScanner4.close()
println(" \n \n ")
println("**************** Qualifier Filter (Printing name corresponding to Row_Key = 5) **************** \n ")
val qual_filter2 = new QualifierFilter(CompareFilter.CompareOp.EQUAL,
new BinaryComparator(Bytes.toBytes("name")));
scan.setFilter(qual_filter2)
val get = new Get(Bytes.toBytes("5")); get.setFilter(qual_filter2);
val result = tableName.get(get);
printRow(result)
println(" \n \n ")
// **************** Filter List ****************
println("**************** Filter List (print all rows between two row_Keys 4 & 7 (lexicograpphically)) **************** \n ")
val ss = new Scan()
val filter1 = new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("4")));
val filter2 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("7")));
var filterList = new FilterList()
filterList.addFilter(filter1)
filterList.addFilter(filter2)
ss.setFilter(filterList)
val resultScanner8:ResultScanner = tableName.getScanner(ss)
resultScanner8.asScala.foreach(result => {
printRow(result)
})
resultScanner8.close()
println(" \n \n ")
// **************** Single Column Value Filter ****************
println("**************** Single Column Value Filter (print all rows having cities lesser than Delhi (lexicographically)) **************** \n ")
val ss3 = new Scan()
val filter5 = new SingleColumnValueFilter( Bytes.toBytes("personal data"),
Bytes.toBytes("city"),
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("Delhi")));
filter5.setFilterIfMissing(false);
ss3.setFilter(filter5)
val resultScanner10:ResultScanner = tableName.getScanner(ss3)
resultScanner10.asScala.foreach(result => {
printRow(result)
})
resultScanner10.close()
println(" \n \n ")
// **************** Filter List ****************
println("**************** Filter List (print all rows between Delhi and Mumbai (lexicographically)) **************** \n ")
println("****** Before filtering ****** \n ")
val ss2 = new Scan()
val resultScanner9:ResultScanner = tableName.getScanner(ss2)
resultScanner9.asScala.foreach(result => {
printRow(result)
})
resultScanner9.close()
println(" \n \n ")
println("****** After filtering ****** \n ")
val filter3 = new SingleColumnValueFilter( Bytes.toBytes("personal data"),
Bytes.toBytes("city"),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("Delhi")));
filter3.setFilterIfMissing(false);
val filter4 = new SingleColumnValueFilter( Bytes.toBytes("personal data"),
Bytes.toBytes("city"),
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("Mumbai")));
filter4.setFilterIfMissing(false);
var filterList2 = new FilterList()
filterList2.addFilter(filter3)
filterList2.addFilter(filter4)
ss2.setFilter(filterList2)
val resultScanner6:ResultScanner = tableName.getScanner(ss2)
resultScanner6.asScala.foreach(result => {
printRow(result)
})
resultScanner6.close()
println(" \n \n ")
// **************** Filter List ****************
println("**************** Filter List (print all rows Having DOBs between 19770101 and 19951231 (lexicographically)) **************** \n ")
println("****** Before filtering ****** \n ")
val ss4 = new Scan()
val resultScanner11:ResultScanner = tableName.getScanner(ss4)
resultScanner11.asScala.foreach(result => {
printRow(result)
})
resultScanner11.close()
println(" \n \n ")
println("****** After filtering ****** \n ")
val filter7 = new SingleColumnValueFilter( Bytes.toBytes("personal data"),
Bytes.toBytes("dob"),
CompareFilter.CompareOp.GREATER_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("19770101")));
val filter8 = new SingleColumnValueFilter( Bytes.toBytes("personal data"),
Bytes.toBytes("dob"),
CompareFilter.CompareOp.LESS_OR_EQUAL,
new BinaryComparator(Bytes.toBytes("19951231")));
var filterList3 = new FilterList()
filterList3.addFilter(filter7)
filterList3.addFilter(filter8)
ss4.setFilter(filterList3)
val resultScanner12:ResultScanner = tableName.getScanner(ss4)
resultScanner12.asScala.foreach(result => {
printRow(result)
})
resultScanner12.close()
println(" \n \n ")
}
}
Было бы здорово, если бы кто-нибудь мог помочь мне решить эту проблему.