У меня есть инструмент, который использует org.apache.parquet.hadoop.ParquetWriter для преобразования файлов данных CSV в файлы данных паркета.
В настоящее время он обрабатывает только int32
,double
и string
Мне нужно поддерживать логический тип паркета timestamp
(аннотированный как int96), и я теряюсь, как это сделать, потому что не могу найти точную спецификацию в Интернете.
Похоже, что эта временная метка (int96) встречается редко и плохо поддерживается.Я нашел очень мало деталей спецификации в Интернете. Этот github README утверждает, что:
Метки времени, сохраненные как int96, состоят из наносекунд в дне (первые 8 байт) и юлианском дне (последние 4 байта).
В частности:
- Какой паркет Тип использовать для столбца в схеме MessageType ?Я предполагаю, что должен использовать примитивный тип,
PrimitiveTypeName.INT96
, но я не уверен, что может быть способ указать логический тип? - Как мне записать данные?т.е. в каком формате я записываю метку времени в группу?Для временной отметки INT96 я предполагаю, что должен написать некоторый двоичный тип?
Вот упрощенная версия моего кода, которая демонстрирует, что я пытаюсь сделать.В частности, обратите внимание на комментарии «TODO», это две точки в коде, которые соответствуют приведенным выше вопросам.
List<Type> fields = new ArrayList<>();
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int32_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.DOUBLE, "double_col", null));
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.STRING, "string_col", null));
// TODO:
// Specify the TIMESTAMP type.
// How? INT96 primitive type? Is there a logical timestamp type I can use w/ MessageType schema?
fields.add(new PrimitiveType(Type.Repetition.OPTIONAL, PrimitiveTypeName.INT96, "timestamp_col", null));
MessageType schema = new MessageType("input", fields);
// initialize writer
Configuration configuration = new Configuration();
configuration.setQuietMode(true);
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path("output.parquet"),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
1048576,
true,
false,
ParquetProperties.WriterVersion.PARQUET_1_0,
configuration
);
// write CSV data
CSVParser parser = CSVParser.parse(new File(csv), StandardCharsets.UTF_8, CSVFormat.TDF.withQuote(null));
ArrayList<String> columns = new ArrayList<>(schemaMap.keySet());
int colIndex;
int rowNum = 0;
for (CSVRecord csvRecord : parser) {
rowNum ++;
Group group = f.newGroup();
colIndex = 0;
for (String record : csvRecord) {
if (record == null || record.isEmpty() || record.equals( "NULL")) {
colIndex++;
continue;
}
record = record.trim();
String type = schemaMap.get(columns.get(colIndex)).get("type").toString();
MessageTypeConverter.addTypeValueToGroup(type, record, group, colIndex++);
switch (colIndex) {
case 0: // int32
group.add(colIndex, Integer.parseInt(record));
break;
case 1: // double
group.add(colIndex, Double.parseDouble(record));
break;
case 2: // string
group.add(colIndex, record);
break;
case 3:
// TODO: convert CSV string value to TIMESTAMP type (how?)
throw new NotImplementedException();
}
}
writer.write(group);
}
writer.close();