Проблема
Я написал Apache Spark DataFrame в виде файла паркета для приложения глубокого обучения в среде Python; В настоящее время у меня возникают проблемы с реализацией базовых c примеров как petastorm (после этой записной книжки ), так и horovod , при чтении вышеупомянутого файла, а именно. DataFrame имеет следующий тип: DataFrame[features: array<float>, next: int, weight: int]
(так же, как в записной книжке DataBricks, у меня features
был VectorUDT, который я преобразовал в массив).
В обоих случаях Apache Стрелка выдает ArrowIOError : Invalid parquet file. Corrupt footer.
ошибка.
То, что я нашел до сих пор
Я обнаружил в этот вопрос и в этот PR , что по состоянию на версию 2.0 Spark не пишет * Файлы 1021 * или _common_metadata
, если для spark.hadoop.parquet.enable.summary-metadata
не установлено значение true
в конфигурации Spark; эти файлы действительно отсутствуют.
Таким образом, я попытался переписать свой DataFrame в этой среде, но файл _common_metadata
по-прежнему отсутствует. Также работает явная передача схемы в petastorm при построении считывателя (например, передача schema_fields
в make_batch_reader
; это проблема для horovod, поскольку в конструкторе horovod.spark.keras.KerasEstimator
такого параметра нет).
Как бы я мог, если это вообще возможно, сделать так, чтобы Spark выводил эти файлы, или в Arrow, чтобы вывести схему, как это делает Spark?
Минимальный пример с хоровод
# Saving df
print(spark.config.get('spark.hadoop.parquet.enable.summary-metadata')) # outputs 'true'
df.repartition(10).write.mode('overwrite').parquet(path)
# ...
# Training
import horovod.spark.keras as hvd
from horovod.spark.common.store import Store
model = build_model()
opti = Adadelta(learning_rate=0.015)
loss='sparse_categorical_crossentropy'
store = Store().create(prefix_path=prefix_path,
train_path=train_path,
val_path=val_path)
keras_estimator = hvd.KerasEstimator(
num_proc=16,
store=store,
model=model,
optimizer=opti,
loss=loss,
feature_cols=['features'],
label_cols=['next'],
batch_size=auto_steps_per_epoch,
epochs=auto_nb_epochs,
sample_weight_col='weight'
)
keras_model = keras_estimator.fit_on_parquet() # Fails here with ArrowIOError