Как преобразовать несколько файлов паркета в файлы TFrecord, используя SPARK? - PullRequest
0 голосов
/ 22 января 2019

Я хотел бы создать многослойные файлы TFrecord из большого DataFrame на основе определенного условия, для которого я использую write.partitionBy().Я также использую соединитель tenorflow в SPARK, но это, очевидно, не работает вместе с операцией write.partitionBy().Поэтому я не нашел другого способа, кроме как попытаться работать в два этапа:

  1. Переразметить фрейм данных в соответствии с моими условиями, используя partitionBy(), и записать полученные разделы в файлы паркета.
  2. Прочитайте эти файлы паркета, чтобы преобразовать их в файлы TFrecord с помощью плагина Tenorflow-Connector.

Это второй шаг, который я не могу сделать эффективно.Моя идея состояла в том, чтобы прочитать в отдельных файлах паркета исполнителей и немедленно записать их в файлы TFrecord.Но для этого необходим доступ к SQLContext, который может быть выполнен только в драйвере ( обсуждается здесь ), поэтому не параллельно.Я хотел бы сделать что-то вроде этого:

# List all parquet files to be converted
import glob, os
files = glob.glob('/path/*.parquet'))

sc = SparkSession.builder.getOrCreate()
sc.parallelize(files, 2).foreach(lambda parquetFile: convert_parquet_to_tfrecord(parquetFile))

Могу ли я построить функцию convert_parquet_to_tfrecord, которая сможет сделать это на исполнителях?

Я также пытался просто использоватьподстановочный знак при чтении всех файлов паркета:

SQLContext(sc).read.parquet('/path/*.parquet')

Это действительно читает все файлы паркета, но, к сожалению, не в отдельных разделах.Похоже, что оригинальная структура теряется, поэтому мне не поможет, если я хочу, чтобы точное содержимое отдельных файлов паркета было преобразовано в файлы TFrecord.

Любые другие предложения?

1 Ответ

0 голосов
/ 27 января 2019

Если я правильно понял ваш вопрос, вы хотите записать разделы локально на рабочий диск.

Если это так, то я бы порекомендовал посмотреть spark-tenorflow-connector's инструкции о том, как это сделать.

Это код, который вы ищете (как указано в документации, приведенной выше):

myDataFrame.write.format("tfrecords").option("writeLocality", "local").save("/path")  

В примечании, если выбеспокоит эффективность почему ты используешь pyspark?Вместо этого лучше использовать scala.

...