Я пытаюсь сгенерировать файлы Parquet с помощью Pyspark.Я обнаружил, что в метаданных сгенерированных файлов Parquet статистика столбцов TimestampType и DecimalType не задана (столбцы IntegerType и DateType подходят).
Мне нужна статистика (min и max) для фильтрациигруппы строк при чтении файлов паркета с помощью PyArrow.
Я использую pyspark 2.4.0 для генерации файлов паркета и pyarrow 0.12.1 для их чтения:
import datetime
from decimal import Decimal
from pyarrow import parquet
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql.types import (
StructField,
StructType,
IntegerType,
DateType,
TimestampType,
DecimalType,
)
data = [
(100, datetime.date(2019, 3, 1), datetime.datetime(2019, 3, 1, 1), Decimal('100.0')),
(200, datetime.date(2019, 3, 2), datetime.datetime(2019, 3, 2, 1), Decimal('200.0')),
(300, datetime.date(2019, 3, 3), datetime.datetime(2019, 3, 3, 1), Decimal('300.0')),
(400, datetime.date(2019, 3, 4), datetime.datetime(2019, 3, 4, 1), Decimal('400.0')),
]
columns = [
StructField('int_column', IntegerType()),
StructField('date_column', DateType()),
StructField('timestamp_column', TimestampType()),
StructField('decimal_column', DecimalType(10, 2)),
]
schema = StructType(columns)
spark_context = SparkContext(conf=SparkConf())
sql_context = SQLContext(spark_context)
rdd = spark_context.parallelize(data)
df = sql_context.createDataFrame(rdd, schema=schema)
db_path = '/tmp/parquet_test'
df.write.parquet(db_path, compression='gzip')
dataset = parquet.ParquetDataset(db_path)
meta_data = dataset.pieces[1].get_metadata(parquet.ParquetFile)
row_group = meta_data.row_group(0)
for col_index in range(len(columns)):
column = row_group.column(col_index)
print(f'column name: {column.path_in_schema}, is_stats_set: {column.is_stats_set}')
Выводкод:
column name: int_column, is_stats_set: True
column name: date_column, is_stats_set: True
column name: timestamp_column, is_stats_set: False
column name: decimal_column, is_stats_set: False
Я не знаю, почему статистика последних 2 столбцов не устанавливается PySpark.
Я что-то упустил в коде, или это ожидаемое поведение PySpark?Если это последнее, мне нужно будет самостоятельно конвертировать метку времени в целое число.