Как записать логический тип TIMESTAMP (INT96) в паркет, используя ParquetWriter? - PullRequest
0 голосов
/ 12 февраля 2019

У меня есть инструмент, который использует org.apache.parquet.hadoop.ParquetWriter для преобразования файлов данных CSV в файлы данных паркета.

В настоящее время он обрабатывает только int32,double и string

Мне нужно поддерживать логический тип паркета timestamp (аннотированный как int96), и я теряюсь, как это сделать, потому что не могу найти точную спецификацию в Интернете.

Похоже, что эта временная метка (int96) встречается редко и плохо поддерживается.Я нашел очень мало деталей спецификации в Интернете. Этот github README утверждает, что:

Метки времени, сохраненные как int96, состоят из наносекунд в дне (первые 8 байт) и юлианском дне (последние 4 байта).

В частности:

  1. Какой паркет Тип использовать для столбца в схеме MessageType ?Я предполагаю, что должен использовать примитивный тип, PrimitiveTypeName.INT96, но я не уверен, что может быть способ указать логический тип?
  2. Как мне записать данные?т.е. в каком формате я записываю метку времени в группу?Для временной отметки INT96 я предполагаю, что должен написать некоторый двоичный тип?

Вот упрощенная версия моего кода, которая демонстрирует, что я пытаюсь сделать.В частности, обратите внимание на комментарии «TODO», это две точки в коде, которые соответствуют приведенным выше вопросам.

List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));

// TODO: 
//   Specify the TIMESTAMP type. 
//   How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null)); 

MessageType schema = new MessageType("input", fields);

// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
  new Path("output.parquet"),
  new GroupWriteSupport(),
  CompressionCodecName.SNAPPY,
  ParquetWriter.DEFAULT_BLOCK_SIZE,
  ParquetWriter.DEFAULT_PAGE_SIZE,
  1048576,
  true,
  false,
  ParquetProperties.WriterVersion.PARQUET_1_0,
  configuration
);

// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
  rowNum ++;
  Group group = f.newGroup();
  colIndex = 0;
  for (String record : csvRecord) {
    if (record == null || record.isEmpty() || record.equals( "NULL")) {
      colIndex++;
      continue;
    }


    record = record.trim();
    String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
    MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);

    switch (colIndex) {
      case 0: // int32
        group.add(colIndex, Integer.parseInt(record));
        break;
      case 1: // double
        group.add(colIndex, Double.parseDouble(record));
        break;
      case 2: // string
        group.add(colIndex, record);
        break;
      case 3:
        // TODO: convert CSV string value to TIMESTAMP type (how?)
        throw new NotImplementedException();
    }
  }
  writer.write(group);
}
writer.close();

Ответы [ 2 ]

0 голосов
/ 14 февраля 2019

Я понял это, используя этот код из spark sql в качестве ссылки.

Бинарное кодирование INT96 разделено на 2 части: первые 8 байтов - это наносекунды с полуночи. Последние 4 байтаЭто Юлианский день

String value = "2019-02-13 13:35:05";

final long NANOS_PER_HOUR = TimeUnit.HOURS.toNanos(1);
final long NANOS_PER_MINUTE = TimeUnit.MINUTES.toNanos(1);
final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);

// Parse date
SimpleDateFormat parser = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
cal.setTime(parser.parse(value));

// Calculate Julian days and nanoseconds in the day
LocalDate dt = LocalDate.of(cal.get(Calendar.YEAR), cal.get(Calendar.MONTH)+1, cal.get(Calendar.DAY_OF_MONTH));
int julianDays = (int) JulianFields.JULIAN_DAY.getFrom(dt);
long nanos = (cal.get(Calendar.HOUR_OF_DAY) * NANOS_PER_HOUR)
        + (cal.get(Calendar.MINUTE) * NANOS_PER_MINUTE)
        + (cal.get(Calendar.SECOND) * NANOS_PER_SECOND);

// Write INT96 timestamp
byte[] timestampBuffer = new byte[12];
ByteBuffer buf = ByteBuffer.wrap(timestampBuffer);
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(nanos).putInt(julianDays);

// This is the properly encoded INT96 timestamp
Binary tsValue = Binary.fromReusedByteArray(timestampBuffer);

0 голосов
/ 13 февраля 2019
  1. В метках времени INT96 используется физический тип INT96 без какого-либо логического типа, поэтому не комментируйте их ничем.
  2. Если вас интересует структура метки времени INT96, посмотрите здесь .Если вы хотите увидеть пример кода, который преобразуется в этот формат и из него, взгляните на этот файл из Hive .
...