Можем ли мы одновременно писать и читать ORC-файл программно в Java? - PullRequest
0 голосов
/ 02 апреля 2019

Я хочу хранить свои данные в столбчатом формате.Когда я пишу свои данные, следующий набор строк может потребовать от меня обработки некоторых строк, которые уже были вставлены в существующий файл ORC.Согласно нашему пониманию, мы не можем читать данные из файла ORC, пока не закроем программу записи.Мы хотим сэкономить место на диске, поскольку файл ORC будет занимать минимальный размер блока, поэтому нам нужна возможность параллельного чтения и записи для одного и того же файла.

Ниже приведен фрагмент кода для лучшего разъяснения того API, которым мы являемсяиспользуя:

void writeRecordsInBatch(Configuration conf, String path) throws IOException {

    TypeDescription schema1 = TypeDescription.createStruct();
    schema1.addField("rownum", TypeDescription.createInt());

    for(int i=1; i<=5; i++){
        schema1.addField("m"+i, TypeDescription.createDouble());
    }

    OrcFile.WriterOptions opts = OrcFile.writerOptions(conf);
    opts.setSchema(schema1);
    opts.rowIndexStride(1024);
    Writer writer = OrcFile.createWriter(new Path(path),opts);

    VectorizedRowBatch batch = schema1.createRowBatch(5000);
    batch.setPartitionInfo(1, 1);
    Random rndm =new Random();
    LongColumnVector rownum = (LongColumnVector) batch.cols[0];
    for(int r=1; r <= 1_000_000; ++r) {
        int row = batch.size++;
        rownum.vector[row] = r;
        for(int i=1; i<=5 ; i++){
            ((DoubleColumnVector)batch.cols[i]).vector[row] = rndm.nextDouble();
        }

        // If the batch is full, write it out and start over.
        if (batch.size == batch.getMaxSize()) {
            writer.addRowBatch(batch);
            batch.reset();
        }
    }
    if (batch.size != 0) {
        writer.addRowBatch(batch);
        batch.reset();
    }
    writer.close();
}


void readRecords(Configuration conf, String path) throws IOException {
    Reader reader = OrcFile.createReader(new Path(path), OrcFile.readerOptions(conf));

    //required while applying search arguments
    String[]  columnsToRead = new String[3];
    columnsToRead[0] = "rownum";
    columnsToRead[1] = "m1";
    columnsToRead[2] = "m2";

    SearchArgument.Builder argumentBuilder= SearchArgumentFactory.newBuilder();
    argumentBuilder.lessThanEquals("rownum", PredicateLeaf.Type.LONG, 1001l);
    SearchArgument searchArgument  = argumentBuilder.build();

    boolean[] include = new boolean[reader.getMetadataSize()];
    include[1] = true;
    include[3]=true; // index starts from 1 not 0..
    include[5]=true;

    org.apache.orc.Reader.Options options = new org.apache.orc.Reader.Options();
    options.include(include);
    options.searchArgument(searchArgument, columnsToRead);

    RecordReader rows = reader.rowsOptions(options);

    int r=0;
    Object row = null;
    while (rows.hasNext())
    {
        row = rows.next(row);
        if(r%1000 == 0)
        {
            System.out.println(r+" :: " + row);
        }
        r++;
    }
    rows.close();
}
...