Как объединить или объединить небольшие файлы ORC в файл большего размера? - PullRequest
0 голосов
/ 26 апреля 2018

В большинстве вопросов / ответов по SO и в Интернете обсуждается использование Hive для объединения нескольких небольших файлов ORC в более крупный, однако мои файлы ORC - это файлы журнала, которые разделены по дням, и мне нужно хранить их отдельно. Я только хочу "свернуть" файлы ORC в день (которые являются каталогами в HDFS).

Мне нужно написать решение на Java, скорее всего, и наткнулся на OrcFileMergeOperator , который может быть тем, что мне нужно использовать, но пока рано говорить.

Каков наилучший подход к решению этой проблемы?

Ответы [ 2 ]

0 голосов
/ 27 апреля 2018

Здесь есть хорошие ответы, но ни один из них не позволяет мне выполнять работу cron, чтобы я мог делать ежедневные свертки. У нас есть файлы журнала journald, которые ежедневно пишут в HDFS, и я не хочу каждый день запускать запрос в Hive, когда захожу.

То, что я закончил, мне показалось более простым. Я написал программу на Java, которая использует библиотеки ORC для сканирования всех файлов в каталоге и создает список этих файлов. Затем открывает новый Writer, который является «комбинированным» файлом (который начинается с «.», Поэтому он скрыт от Hive, иначе Hive завершится ошибкой). Затем программа открывает каждый файл в списке, считывает содержимое и записывает в объединенный файл. Когда все файлы были прочитаны, он удаляет файлы. Я также добавил возможность запускать один каталог за раз в случае необходимости.

ПРИМЕЧАНИЕ. Вам понадобится файл схемы. Журналы журнала можно выводить в формате json "journalctl -o json", а затем вы можете использовать инструменты Apache ORC для создания файла схемы или сделать это вручную. Auto-gen от ORC - это хорошо, но руководство всегда лучше.

ПРИМЕЧАНИЕ. Чтобы использовать этот код "как есть", вам потребуется действительная таблица ключей и добавить -Dkeytab = в ваш путь к классу.

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.orc.OrcFile;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;

import com.cloudera.org.joda.time.LocalDate;

public class OrcFileRollUp {

  private final static String SCHEMA = "journald.schema";
  private final static String UTF_8 = "UTF-8";
  private final static String HDFS_BASE_LOGS_DIR = "/<baseDir>/logs";
  private static final String keytabLocation = System.getProperty("keytab");
  private static final String kerberosUser = "<userName>";
  private static Writer writer;

  public static void main(String[] args) throws IOException {

    Configuration conf = new Configuration();
    conf.set("hadoop.security.authentication", "Kerberos");

    InetAddress myHost = InetAddress.getLocalHost();
    String kerberosPrincipal = String.format("%s/%s", kerberosUser, myHost.getHostName());
    UserGroupInformation.setConfiguration(conf);
    UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, keytabLocation);

    int currentDay = LocalDate.now().getDayOfMonth();
    int currentMonth = LocalDate.now().getMonthOfYear();
    int currentYear = LocalDate.now().getYear();

    Path path = new Path(HDFS_BASE_LOGS_DIR);

    FileSystem fileSystem = path.getFileSystem(conf);
    System.out.println("The URI is: " + fileSystem.getUri());


    //Get Hosts:
    List<String> allHostsPath = getHosts(path, fileSystem);

    TypeDescription schema = TypeDescription.fromString(getSchema(SCHEMA)
        .replaceAll("\n", ""));

    //Open each file for reading and write contents
    for(int i = 0; i < allHostsPath.size(); i++) {

      String outFile = "." + currentYear + "_" + currentMonth + "_" + currentDay + ".orc.working";            //filename:  .2018_04_24.orc.working

      //Create list of files from directory and today's date OR pass a directory in via the command line in format 
      //hdfs://<namenode>:8020/HDFS_BASE_LOGS_DIR/<hostname>/2018/4/24/
      String directory = "";
      Path outFilePath;
      Path argsPath;
      List<String> orcFiles;

      if(args.length == 0) {
        directory = currentYear + "/" + currentMonth + "/" + currentDay;
        outFilePath = new Path(allHostsPath.get(i) + "/" + directory + "/" + outFile);
        try {
          orcFiles = getAllFilePath(new Path(allHostsPath.get(i) + "/" + directory), fileSystem);
        } catch (Exception e) {
          continue;
        }
      } else {
        outFilePath = new Path(args[0] + "/" + outFile);
        argsPath = new Path(args[0]);
        try {
          orcFiles = getAllFilePath(argsPath, fileSystem);
        } catch (Exception e) {
          continue;
        }
      }

      //Create List of files in the directory

      FileSystem fs = outFilePath.getFileSystem(conf);

      //Writer MUST be below ^^ or the combination file will be deleted as well.
      if(fs.exists(outFilePath)) {
        System.out.println(outFilePath + " exists, delete before continuing.");
      } else {
       writer = OrcFile.createWriter(outFilePath, OrcFile.writerOptions(conf)
            .setSchema(schema));
      }

      for(int j = 0; j < orcFiles.size(); j++ ) { 
        Reader reader = OrcFile.createReader(new Path(orcFiles.get(j)), OrcFile.readerOptions(conf));

        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        RecordReader rows = reader.rows();

        while (rows.nextBatch(batch)) {
          if (batch != null) {
             writer.addRowBatch(batch);
          }
        }
        rows.close();
        fs.delete(new Path(orcFiles.get(j)), false);
      }
      //Close File
      writer.close();

      //Remove leading "." from ORC file to make visible to Hive
      outFile = fileSystem.getFileStatus(outFilePath)
                                      .getPath()
                                      .getName();

      if (outFile.startsWith(".")) {
        outFile = outFile.substring(1);

        int lastIndexOf = outFile.lastIndexOf(".working");
        outFile = outFile.substring(0, lastIndexOf);
      }

      Path parent = outFilePath.getParent();

      fileSystem.rename(outFilePath, new Path(parent, outFile));

      if(args.length != 0)
        break;
    }
  }

  private static String getSchema(String resource) throws IOException {
    try (InputStream input = OrcFileRollUp.class.getResourceAsStream("/" + resource)) {
      return IOUtils.toString(input, UTF_8);
    }
  }

  public static List<String> getHosts(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> hostsList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      hostsList.add(fileStat.getPath().toString());
    }
    return hostsList;
  }

  private static List<String> getAllFilePath(Path filePath, FileSystem fs) throws FileNotFoundException, IOException {
    List<String> fileList = new ArrayList<String>();
    FileStatus[] fileStatus = fs.listStatus(filePath);
    for (FileStatus fileStat : fileStatus) {
      if (fileStat.isDirectory()) {
        fileList.addAll(getAllFilePath(fileStat.getPath(), fs));
      } else {
        fileList.add(fileStat.getPath()
                             .toString());
      }
    }
    for(int i = 0; i< fileList.size(); i++) {
      if(!fileList.get(i).endsWith(".orc"))
        fileList.remove(i);
    }

    return fileList;
  }

}
0 голосов
/ 26 апреля 2018

Вам не нужно заново изобретать колесо.

ALTER TABLE table_name [PARTITION partition_spec] CONCATENATE может использоваться для объединения небольших файлов ORC в файл большего размера, начиная с Hive 0.14.0. Объединение происходит на уровне чередования, что позволяет избежать распаковки и декодирования данных. Работает быстро. Я бы предложил создать внешнюю таблицу, разделенную по дням (разделы - это каталоги), а затем объединить их, указав PARTITION (day_column) в качестве спецификации раздела.

См. Здесь: LanguageManual + ORC

...