PySpark: создание RDD-> DF-> Parquet со схемой, имеющей 1000 полей, но строк с переменным числом столбцов - PullRequest
2 голосов
/ 12 марта 2019

Я пытаюсь прочитать индекс ElasticSearch , который содержит миллионы документов, каждый из которых имеет переменное количество полей. У меня есть схема, которая имеет 1000 полей, каждое из которых имеет свое имя и тип.

Теперь, когда я создаю RDD корыто ES-Hadoop соединитель и позже конвертирую в DataFrame , указав схему, он не может сказать -

Входная строка не имеет ожидаемого количества значений, требуемых схема

У меня есть несколько вопросов. 1. Можно ли иметь RDD / DF со строками, содержащими переменное количество полей? Если нет, что является альтернативой, кроме добавления нулевого значения для пропущенных полей в каждом столбце?

  1. Я вижу, что по умолчанию Spark преобразует все в StringType, когда я использую sc.newAPIHadoopRDD() вызов. Как я могу типизировать их, чтобы исправить тип, основанный на имени поля, которое у меня есть в моей схеме? Какое-то отображение?

  2. Я хочу написать это в формате Parquet со схемой, добавленной в файл. Что происходит с этими пропущенными полями по сравнению со схемой, имеющей тысячи полей.

1 Ответ

1 голос
/ 12 марта 2019
  1. Вы не можете иметь переменное количество столбцов, но вы можете использовать один столбец типа коллекции, например, Array или Map, который в python соответствует словарю. Это позволяет вам хранить данные переменной длины в вашем столбце. В противном случае да, вам нужно иметь значение для каждого столбца в вашей схеме. Вы обычно заполняете отсутствующие значения нулями.

  2. Если у вас уже есть фрейм данных, и у вас есть функция get_column_type, которая получает имя типа из имени столбца, вы можете переписать весь фрейм данных следующим образом:

    import pyspark.sql.functions as F
    select_expressions = [ F.col(column_name).cast(get_column_type(column_name)) for column_name in column_list]
    recasted_df = df.select(*select_expressions)
    
  3. Файл паркета будет иметь те столбцы, которые есть в вашем кадре данных. Если вам нужно 1000 полей в файле, они должны быть в кадре данных, поэтому вам нужно будет заполнить пропущенные значения нулями или другим значением.

Теперь, если вы сложите все эти точки вместе, вы, вероятно, захотите сделать что-то вроде этого:

  • Считайте каждый эластичный документ в строку с полем id и полем doc типа MapType.
  • explode поле документа, так что теперь у вас есть 3 столбца: id, key и value, с одной строкой для каждой клавиши в каждом документе. На этом этапе вы можете записать в файл паркета и закончить процесс.

Если вам нужен кадр данных с полной схемой, вам нужно выполнить следующие дополнительные шаги:

  • Поворот результата для создания только одной строки для каждого идентификатора и столбца для каждого ключа в документе с соответствующим значением: pivoted_df = df.groupBy('id').pivot('key').agg(F.first('value')
  • Этот фрейм данных имеет все поля, присутствующие в данных. Если вы знаете полную схему, вы можете добавить фиктивные столбцы для отсутствующих: df = df.withColumn('new_column', lit(None).cast(StringType())
  • Наконец, измените столбцы с кодом в точке 2 и удалите столбец id. Вы можете написать это в паркет, и он будет иметь все столбцы в вашей большой схеме.
...