Я изучаю HBase, я уже разработал задание MapReduce в Python, но я так и не научился Java ...
Я хочу написать MapReduce для извлечения групповых данных из файла и создания Hbase Таблица.
Я использую Eclipse (и виртуальную машину с Quickstart Cloudera).
У меня есть файл, содержащий много данных. Данные хранятся следующим образом :
ДАТА | ВРЕМЯ | АДРЕС | КЛЮЧЕВЫЕ СЛОВА
Данные разделены запятыми :
05/01 / 2005,20: 53: 23,5 Williams camp Lake Janechester TW4X 5PZ, nu eta gamma alpha,
11/17 / 2002,09: 34: 49, квартира 43 Kathryn терраса East Wayne E53 2GG, гамма макс бета,
04/23 / 2004,13: 46: 46, квартира 5 Кэтлин через Jakeland PR2 2YJ, макс альфа ню бета мин гамма среднее значение Валь-Зета,
12/03 / 2006,07: 22: 26 750 Квадрат Мариана Robertsport S3A 1AA, eta,
03/16 / 2002,21: 39: 51,8 Грегори Кей Норт Шейнфорт G46 6XW, бета ,
08/20 / 2002,20: 27: 11,342 Джоан Туннель Turnerhaven BS1A 9HJ, альфа-дзета-бета-среднее значение nu min max,
11/06 / 2001,09: 50: 19, Flat 64 Holland walk Connorchester B8J 4JE, бета альфа-гамма,
...
Каждая строка таблицы (таблица с именем myTable) должна иметь:
- RowKey (то есть md5 га sh линии)
- Один семейный столбец с 4 столбцами (один из файлов)
Я написал следующую программу:
package ImportDataFromFileToHBase;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
//import org.apache.hadoop.hbase.client.HBaseAdmin;
//import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import com.amazonaws.thirdparty.apache.codec.digest.DigestUtils;
public class DataFromFileToHBase {
static void CreateTable(Admin admin, String tableName, String... families) throws IOException {
TableName tn = TableName.valueOf(tableName);
if (admin.tableExists(tn)) {
System.out.println("Table " + tableName + " exists");
System.out.println("Delete table : " + tableName);
DeleteTable(admin, tableName);
}
HTableDescriptor htd = new HTableDescriptor(tn);
for (String family: families) {
htd.addFamily(new HColumnDescriptor(family));
}
admin.createTable(htd);
}
static void DeleteTable(Admin admin, String tableName) throws IOException {
TableName tn = TableName.valueOf(tableName);
if (admin.tableExists(tn)) {
admin.disableTable(tn);
admin.deleteTable(tn);
}
}
public static void main(String[] args) throws IOException {
int count = 0;
String fileName = "RandomDataFile_light.txt";
//Create File Object
File f = new File(fileName);
System.out.println("Absolute path : " + f.getAbsolutePath());
System.out.println("file name : " + f.getName());
BufferedReader inputFile = null;
String line = "";
Configuration conf = HBaseConfiguration.create();
Connection connection = null;
Admin admin = null;
Table myTable = null;
Put p = null;
String tableName = "myTable";
try {
// Connexion to HBase ?
connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
// Creation of the Table
CreateTable(admin, tableName, "column1");
myTable = connection.getTable(TableName.valueOf(tableName));
inputFile = new BufferedReader(new InputStreamReader(new FileInputStream(fileName), "UTF-8"));
line = inputFile.readLine();
while (line != null) {
count += 1;
byte[] rowKey = DigestUtils.md5(line);
List<String> dataList = Arrays.asList(line.split(","));
// System.out.println(dataList.get(1));
// p = new Put(Bytes.toBytes("row" + count));
p = new Put(rowKey);
p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("date"), Bytes.toBytes(dataList.get(0)));
p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("hour"), Bytes.toBytes(dataList.get(1)));
p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("address"), Bytes.toBytes(dataList.get(2)));
p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("keywords"), Bytes.toBytes(dataList.get(3)));
myTable.put(p);
if (count%100 == 0) {
System.out.print("-");
}
if (count%1000 == 0) {
System.out.println(" " + count);
}
line = inputFile.readLine();
}
System.out.println("Job Done !");
} catch (Exception e) {
return;
} finally {
// closing input file
if (inputFile != null)
inputFile.close();
// closing admin
if (admin != null) {
admin.close();
}
// closing connection
if (connection != null) {
connection.close();
}
// closing myTable
if (myTable != null) {
myTable.close();
}
}
}
}
И это работает отлично (для новичка в Java, это хорошая победа;))
Ну, теперь мне нужно "перевести" эту программу в MapReduce Job , и я полностью потерян. Понятия слишком сложны для меня. После более чем 1-недельной исследовательской работы, мне нужны ваши знания .
Сначала я собираюсь сделать следующее:
- Основная программа
Создать пустую таблицу Создать объект Put - Карта Программа
Входная строка (просто строка или файл?)
Вывод (ключ, значение), ключ = md5 га sh, а значение - строка
- Уменьшение программы
Ввод (ключ) , значение) из Mapper
Ouput (ключ, значение, пут), команды для пут описаны в Редукторе
Я ошибаюсь?
Как написать это задание MapReduce?
Спасибо за вашу помощь.