Флаг ORC isNull [] не устанавливается после первоначального пакета - PullRequest
0 голосов
/ 05 сентября 2018

У меня есть следующий фрагмент кода для записи и чтения файла ORC. Во время записи для каждой 100-й записи я делаю значения столбца пустыми, устанавливая флаг isNull [] в столбце. Однако при чтении файла этот флаг, похоже, устанавливается только для первого пакета.

Вывод выглядит так:

row #0 col1:null col2:null   <- expected
row #1 col1:X1 col2:3
row #2 col1:X2 col2:6
row #3 col1:X3 col2:9
row #4 col1:X4 col2:12
row #5 col1:X5 col2:15
..
..
row #99 col1:X99 col2:297
row #100 col1:null col2:null <- expected
row #101 col1:X101 col2:303
..
row #999 col1:X999 col2:2997
row #1000 col1: col2:        <-- not expected.isNull seems false?
row #1001 col1:X1001 col2:3003
..
row #1399 col1:X1399 col2:4197
row #1400 col1: col2:0       <-- not expected.isNull seems false?
row #1401 col1:X1401 col2:4203

Код:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.*;

public class Test {

    public static void main(String args[]) throws Exception {

        String file="file:///Users/ashika.umagiliya/my-file21.orc";
        //write(file);
        read(file);


    }

    private static void write(String file) throws Exception {
        Configuration conf = new Configuration();

        TypeDescription schema = TypeDescription.createStruct();//TypeDescription.fromString("struct<x:int,y:int>");
        schema.addField("x", TypeDescription.createString());
        schema.addField("y", TypeDescription.createInt());
        Writer writer = OrcFile.createWriter(new Path(file),
                OrcFile.writerOptions(conf)
                        .bufferSize(1024*1000)
                        .compress(CompressionKind.NONE)
                        .setSchema(schema));

        VectorizedRowBatch batch = schema.createRowBatch(1000);

        BytesColumnVector x = (BytesColumnVector) batch.cols[0];
        x.noNulls = false;
        LongColumnVector y = (LongColumnVector) batch.cols[1];
        y.noNulls = false;

        for (int r = 0; r < 10000; ++r) {

            int row = batch.size++;
            byte[] val1 = ("X" + r).getBytes("UTF-8");
            long val2 = r*3;

            if (r % 100 == 0) { //save nulls
                    x.isNull[row] = true;
                    y.isNull[row] = true;
                    byte[] tmp=new byte[0];
                    x.setVal(row,tmp,0,0);

                } else{ //save not-nulls
                    x.setVal(row, val1);
                    y.vector[row]=val2;
                }
                if (batch.size == batch.getMaxSize()) {
                    writer.addRowBatch(batch);
                    batch.reset();
                }
            }
            if (batch.size != 0) {
                writer.addRowBatch(batch);
                batch.reset();
            }
            writer.close();
        }


    private static void read(String file) throws Exception {
        Configuration conf = new Configuration();
        Reader reader = OrcFile.createReader(new Path(file),
                OrcFile.readerOptions(conf));
        RecordReader rows = reader.rows();
        VectorizedRowBatch batch = reader.getSchema().createRowBatch();
        BytesColumnVector col1 =(BytesColumnVector)batch.cols[0];
        LongColumnVector col2 =(LongColumnVector)batch.cols[1];

        int row=0;
        while (rows.nextBatch(batch)) {
            for(int r=0; r < batch.size; ++r) {

                byte[] b1 = col1.vector[r];
                long val2 = col2.vector[r];
                System.out.println( ("row #"+row+" col1:"+(col1.isNull[r] ? "null" : col1.toString(r)))  +" col2:"+(col2.isNull[r] ? "null" : val2));
                row++;

            }
            //row++;
        }
        rows.close();
        String s=null;
        System.out.println(s);
    }
}
...