Массовая загрузка AWS EMR HBase - PullRequest
       22

Массовая загрузка AWS EMR HBase

0 голосов
/ 04 сентября 2018

Я разработал программу Map Reduce для выполнения массовой загрузки HBase, используя технику, описанную в этой статье Cloudera: https://blog.cloudera.com/blog/2013/09/how-to-use-hbase-bulk-loading-and-why/.

На нашем предыдущем кластере Cloudera Hadoop он работал очень хорошо. Теперь мы переходим на AWS. Мне не удается заставить эту программу работать на кластере AWS EMR.

детали EMR:

  • Метка выпуска: emr-5.16.0
  • Распределение Hadoop: Amazon 2.8.4
  • Приложения: Spark 2.3.1, HBase 1.4.4
  • Мастер: m4.4xlarge
  • Узлы: 12 x m4.4xlarge

Вот код моего драйвера

        Job job = Job.getInstance(getConf());
        job.setJobName("My job");
        job.setJarByClass(getClass());

        // Input
        FileInputFormat.setInputPaths(job, input);

        // Mapper
        job.setMapperClass(MyMapper.class);
        job.setInputFormatClass(ExampleInputFormat.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        // Reducer : Auto configure partitioner and reducer
        Table table = HBaseCnx.getConnection().getTable(TABLE_NAME);
        RegionLocator regionLocator = HBaseCnx.getConnection().getRegionLocator(TABLE_NAME);
        HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator);

        // Output
        Path out = new Path(output);
        FileOutputFormat.setOutputPath(job, out);

        // Launch the MR job
        logger.debug("Start - Map Reduce job to produce HFiles");
        boolean b = job.waitForCompletion(true);
        if (!b) throw new RuntimeException("FAIL - Produce HFiles for HBase bulk load");
        logger.debug("End - Map Reduce job to produce HFiles");

        // Make the output HFiles usable by HBase (permissions)
        logger.debug("Start - Set the permissions for HBase in the output dir " + out.toString());
        //fs.setPermission(outputPath, new FsPermission(ALL, ALL, ALL)); => not recursive
        FsShell shell = new FsShell(getConf());
        shell.run(new String[]{"-chmod", "-R", "777", out.toString()});
        logger.debug("End - Set the permissions for HBase in the output dir " + out.toString());

        // Run complete bulk load
        logger.debug("Start - HBase Complete Bulk Load");
        LoadIncrementalHFiles loadIncrementalHFiles = new LoadIncrementalHFiles(getConf());
        int loadIncrementalHFilesOutput = loadIncrementalHFiles.run(new String[]{out.toString(), TABLE_NAME.toString()});
        if (loadIncrementalHFilesOutput != 0) {
            throw new RuntimeException("Problem in LoadIncrementalHFiles. Return code is " + loadIncrementalHFiles);
        }
        logger.debug("End - HBase Complete Bulk Load");

Мой картограф читает паркетные файлы и выдает:

  • ключ, который является ключом строки для Put as ImmutableBytesWritable
  • значение, которое является путём HBase

Проблема возникает на шаге уменьшения. В «системном журнале» каждого Редуктора я получал сообщения об ошибках, связанных с соединениями с сокетом. Вот часть системного журнала:

2018-09-04 08:21:39,085 INFO [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-04 08:21:39,086 WARN [main-SendThread(localhost:2181)] org.apache.zookeeper.ClientCnxn: Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused
2018-09-04 08:21:55,705 ERROR [main] org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper: ZooKeeper exists failed after 4 attempts
2018-09-04 08:21:55,705 WARN [main] org.apache.hadoop.hbase.zookeeper.ZKUtil: hconnection-0x3ecedf210x0, quorum=localhost:2181, baseZNode=/hbase Unable to set watcher on znode (/hbase/hbaseid)
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
2018-09-04 08:21:55,706 ERROR [main] org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher: hconnection-0x3ecedf210x0, quorum=localhost:2181, baseZNode=/hbase Received unexpected KeeperException, re-throwing exception
org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
2018-09-04 08:21:55,706 WARN [main] org.apache.hadoop.hbase.client.ZooKeeperRegistry: Can't retrieve clusterId from Zookeeper

После нескольких поисков в Google я нашел несколько постов, в которых предлагалось установить IP кворума непосредственно в коде Java. Я так и сделал, но это не сработало. Вот как я сейчас получаю соединение HBase

Configuration conf = HBaseConfiguration.create();
conf.addResource(new Path("/etc/hbase/conf/hbase-site.xml"));

    // Attempts to set directly the quorum IP in the Java code that did not work
    //conf.clear();
    //conf.set("hbase.zookeeper.quorum", "...ip...");
    //conf.set("hbase.zookeeper.property.clientPort", "2181");

    Connection cnx = ConnectionFactory.createConnection(conf);

Чего я не понимаю, так это того, что все остальное работает. Я могу программно создавать таблицы, запрашивать таблицы (Scan или Get). Я даже могу использовать работу MR, которая вставляет данные с TableMapReduceUtil.initTableReducerJob("my_table", IdentityTableReducer.class, job);. Но, конечно, он намного менее быстр, чем метод полной загрузки HBase, который напрямую записывает HFiles, разбитые по существующим регионам.

Спасибо за помощь

1 Ответ

0 голосов
/ 19 октября 2018

Я работал над аналогичной миграцией. Проблема заключается в том, что редуктор работает в отдельном процессе, поэтому вам нужно установить кворум в конфигурации задания. Это сделает значение доступным для редуктора.

job.getConfiguration().set("hbase.zookeeper.quorum", "...ip...");

...