Предположим, у вас были следующие два кадра данных:
source.show()
#+---+---+---+---+
#| a| c| d| e|
#+---+---+---+---+
#| A| C| 0| E|
#+---+---+---+---+
target.show()
#+---+---+---+---+
#| a| b| c| d|
#+---+---+---+---+
#| A| B| C| 1|
#+---+---+---+---+
Со следующими типами данных:
print(source.dtypes)
#[('a', 'string'), ('c', 'string'), ('d', 'string'), ('e', 'string')]
print(target.dtypes)
#[('a', 'string'), ('b', 'string'), ('c', 'string'), ('d', 'int')]
Если я правильно понимаю вашу логику, вам поможет следующее понимание списка:
from pyspark.sql.functions import col, lit
new_source = source.select(
*(
[
col(t).cast(d) if t in source.columns else lit(None).alias(t)
for t, d in target.dtypes
] +
[s for s in source.columns if s not in target.columns]
)
)
new_source.show()
new_source.show()
#+---+----+---+---+---+
#| a| b| c| d| e|
#+---+----+---+---+---+
#| A|null| C| 0| E|
#+---+----+---+---+---+
И полученный вывод будет иметь следующую схему:
new_source.printSchema()
#root
# |-- a: string (nullable = true)
# |-- b: null (nullable = true)
# |-- c: string (nullable = true)
# |-- d: integer (nullable = true)
# |-- e: string (nullable = true)
Как видите, тип данных столбца d
изменен с string
на integer
в соответствии со схемой целевой таблицы.
Логика заключается в том, чтобы сначала перебрать столбцы в target
и выбрать их, если они существуют в source.columns
, или создать столбец из null
s, если он не существует. Затем добавьте столбцы из source
, которых нет в target
.