Я хотел бы создать многослойные файлы TFrecord из большого DataFrame на основе определенного условия, для которого я использую write.partitionBy()
.Я также использую соединитель tenorflow в SPARK, но это, очевидно, не работает вместе с операцией write.partitionBy()
.Поэтому я не нашел другого способа, кроме как попытаться работать в два этапа:
- Переразметить фрейм данных в соответствии с моими условиями, используя
partitionBy()
, и записать полученные разделы в файлы паркета. - Прочитайте эти файлы паркета, чтобы преобразовать их в файлы TFrecord с помощью плагина Tenorflow-Connector.
Это второй шаг, который я не могу сделать эффективно.Моя идея состояла в том, чтобы прочитать в отдельных файлах паркета исполнителей и немедленно записать их в файлы TFrecord.Но для этого необходим доступ к SQLContext, который может быть выполнен только в драйвере ( обсуждается здесь ), поэтому не параллельно.Я хотел бы сделать что-то вроде этого:
# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))
sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))
Могу ли я построить функцию convert_parquet_to_tfrecord
, которая сможет сделать это на исполнителях?
Я также пытался просто использоватьподстановочный знак при чтении всех файлов паркета:
SQLContext(sc).read.parquet('/path/*.parquet')
Это действительно читает все файлы паркета, но, к сожалению, не в отдельных разделах.Похоже, что оригинальная структура теряется, поэтому мне не поможет, если я хочу, чтобы точное содержимое отдельных файлов паркета было преобразовано в файлы TFrecord.
Любые другие предложения?