Переупорядочить исходные столбцы данных Spark в соответствии с порядком целевого кадра данных в PySpark - PullRequest
0 голосов
/ 27 июня 2019

У меня фиксированный порядок Spark DataFrame из целевой таблицы:

Целевой кадр данных Spark (строка col1, строка col2 int, строка col3, строка col4 double)

Теперь, если исходные данные поступают в беспорядочном порядке:

Source Spark Dataframe (строка col3, col2 int, col4 double, строка col1).

Как я могу изменить исходный DataFrame, чтобы он соответствовал порядку столбцов целевого DataFrame, используя PySpark?

Исходный кадр данных искры должен быть переупорядочен, как показано ниже, чтобы соответствовать целевому кадру данных:

Выход:

Обновлен исходный кадр данных Spark (строка col1, строка col2, строка col3, строка col4 double)

Сценарий 2:

Исходный кадр данных = [a, c, d, e]

Целевой фрейм данных = [a, b, c, d]

В этом случае исходный DataFrame должен быть переставлен на [a,b,c,d,e]

  • Сохраняйте порядок целевых столбцов
  • Изменить типы данных исходного столбца в соответствии с целевым фреймом данных
  • Добавить новые столбцы в конце
  • Если целевой столбец отсутствует в исходных столбцах, тогда этот столбец все равно следует добавить, но заполнить его значениями null.

В приведенном выше примере после перестановки исходного DataFrame в него будет добавлен столбец b со значениями null.

Это гарантирует, что когда мы используем saveAsTable, исходный DataFrame может быть легко помещен в таблицу, не нарушая существующую таблицу.

1 Ответ

1 голос
/ 27 июня 2019

Предположим, у вас были следующие два кадра данных:

source.show()
#+---+---+---+---+
#|  a|  c|  d|  e|
#+---+---+---+---+
#|  A|  C|  0|  E|
#+---+---+---+---+

target.show()
#+---+---+---+---+
#|  a|  b|  c|  d|
#+---+---+---+---+
#|  A|  B|  C|  1|
#+---+---+---+---+

Со следующими типами данных:

print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]

print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]

Если я правильно понимаю вашу логику, вам поможет следующее понимание списка:

from pyspark.sql.functions import col, lit

new_source = source.select(
    *(
        [
            col(t).cast(d) if t in source.columns else lit(None).alias(t) 
            for t, d in target.dtypes
        ] +
        [s for s in source.columns if s not in target.columns]
    )
)

new_source.show()

new_source.show()
#+---+----+---+---+---+
#|  a|   b|  c|  d|  e|
#+---+----+---+---+---+
#|  A|null|  C|  0|  E|
#+---+----+---+---+---+

И полученный вывод будет иметь следующую схему:

new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)

Как видите, тип данных столбца d изменен с string на integer в соответствии со схемой целевой таблицы.

Логика заключается в том, чтобы сначала перебрать столбцы в target и выбрать их, если они существуют в source.columns, или создать столбец из null s, если он не существует. Затем добавьте столбцы из source, которых нет в target.

...