Импорт данных из файла в HBase - MapReduce Job - PullRequest
0 голосов
/ 17 апреля 2020

Я изучаю HBase, я уже разработал задание MapReduce в Python, но я так и не научился Java ...
Я хочу написать MapReduce для извлечения групповых данных из файла и создания Hbase Таблица.
Я использую Eclipse (и виртуальную машину с Quickstart Cloudera).


У меня есть файл, содержащий много данных. Данные хранятся следующим образом :
ДАТА | ВРЕМЯ | АДРЕС | КЛЮЧЕВЫЕ СЛОВА
Данные разделены запятыми :
05/01 / 2005,20: 53: 23,5 Williams camp Lake Janechester TW4X 5PZ, nu eta gamma alpha,
11/17 / 2002,09: 34: 49, квартира 43 Kathryn терраса East Wayne E53 2GG, гамма макс бета,
04/23 / 2004,13: 46: 46, квартира 5 Кэтлин через Jakeland PR2 2YJ, макс альфа ню бета мин гамма среднее значение Валь-Зета,
12/03 / 2006,07: 22: 26 750 Квадрат Мариана Robertsport S3A 1AA, eta,
03/16 / 2002,21: 39: 51,8 Грегори Кей Норт Шейнфорт G46 6XW, бета ,
08/20 / 2002,20: 27: 11,342 Джоан Туннель Turnerhaven BS1A 9HJ, альфа-дзета-бета-среднее значение nu min max,
11/06 / 2001,09: 50: 19, Flat 64 Holland walk Connorchester B8J 4JE, бета альфа-гамма,
...

Каждая строка таблицы (таблица с именем myTable) должна иметь:

  • RowKey (то есть md5 га sh линии)
  • Один семейный столбец с 4 столбцами (один из файлов)

Я написал следующую программу:

package ImportDataFromFileToHBase;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
//import org.apache.hadoop.hbase.client.HBaseAdmin;
//import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import com.amazonaws.thirdparty.apache.codec.digest.DigestUtils;

public class DataFromFileToHBase {

    static void CreateTable(Admin admin, String tableName, String... families) throws IOException {
        TableName tn = TableName.valueOf(tableName);
        if (admin.tableExists(tn)) {
            System.out.println("Table " + tableName + " exists");
            System.out.println("Delete table : " + tableName);
            DeleteTable(admin, tableName);
        }
        HTableDescriptor htd = new HTableDescriptor(tn);
        for (String family: families) {
            htd.addFamily(new HColumnDescriptor(family));
        }
        admin.createTable(htd);
    }

    static void DeleteTable(Admin admin, String tableName) throws IOException {
        TableName tn = TableName.valueOf(tableName);
        if (admin.tableExists(tn)) {
            admin.disableTable(tn);
            admin.deleteTable(tn);
        }
    }

    public static void main(String[] args) throws IOException {

        int count = 0;

        String fileName = "RandomDataFile_light.txt";
        //Create File Object
        File f = new File(fileName);
        System.out.println("Absolute path : " + f.getAbsolutePath());
        System.out.println("file name     : " + f.getName());
        BufferedReader inputFile = null;
        String line = "";

        Configuration conf = HBaseConfiguration.create();
        Connection connection = null;
        Admin admin = null;
        Table myTable = null;
        Put p = null;
        String tableName = "myTable";

        try {
            // Connexion to HBase ?
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

            // Creation of the Table
            CreateTable(admin, tableName, "column1");
            myTable = connection.getTable(TableName.valueOf(tableName));

            inputFile = new BufferedReader(new InputStreamReader(new FileInputStream(fileName), "UTF-8"));
            line = inputFile.readLine();

            while (line != null) {
                count += 1;

                byte[] rowKey = DigestUtils.md5(line);

                List<String> dataList = Arrays.asList(line.split(","));
//              System.out.println(dataList.get(1));

//              p = new Put(Bytes.toBytes("row" + count));
                p = new Put(rowKey);
                p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("date"), Bytes.toBytes(dataList.get(0)));
                p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("hour"), Bytes.toBytes(dataList.get(1)));
                p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("address"), Bytes.toBytes(dataList.get(2)));
                p.addColumn(Bytes.toBytes("column1"), Bytes.toBytes("keywords"), Bytes.toBytes(dataList.get(3)));

                myTable.put(p);
                if (count%100 == 0) {
                    System.out.print("-");
                }
                if (count%1000 == 0) {
                    System.out.println(" " + count);
                }

                line = inputFile.readLine();
             }

            System.out.println("Job Done !");

        } catch (Exception e) {
            return;
        } finally {
            // closing input file
            if (inputFile != null)
                inputFile.close();

            // closing admin
            if (admin != null) {
                admin.close();
            }
            // closing connection
            if (connection != null) {
                connection.close();
            }
            // closing myTable
            if (myTable != null) {
                myTable.close();
            }
        }
    }

}

И это работает отлично (для новичка в Java, это хорошая победа;))

Ну, теперь мне нужно "перевести" эту программу в MapReduce Job , и я полностью потерян. Понятия слишком сложны для меня. После более чем 1-недельной исследовательской работы, мне нужны ваши знания .

Сначала я собираюсь сделать следующее:
- Основная программа
Создать пустую таблицу Создать объект Put - Карта Программа
Входная строка (просто строка или файл?)
Вывод (ключ, значение), ключ = md5 га sh, а значение - строка
- Уменьшение программы
Ввод (ключ) , значение) из Mapper
Ouput (ключ, значение, пут), команды для пут описаны в Редукторе

Я ошибаюсь?
Как написать это задание MapReduce?

Спасибо за вашу помощь.

...