Ошибка означает, что вы не можете использовать Spark dataframe в UDF. Но поскольку ваш фрейм данных, содержащий имена баз данных и таблиц, скорее всего, мал, достаточно просто взять цикл Python for
, ниже приведены некоторые методы, которые могут помочь получить ваши данные:
from pyspark.sql import Row
# assume dfs is the df containing database names and table names
dfs.printSchema()
root
|-- database: string (nullable = true)
|-- tableName: string (nullable = true)
Method-1: используйте df.dtypes
Запустите sql select * from database.tableName limit 1
, чтобы сгенерировать df и вернуть его dtypes, преобразовать его в StringType ().
data = []
DRow = Row('database', 'tableName', 'dtypes')
for row in dfs.collect():
try:
dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes
data.append(DRow(row.database, row.tableName, str(dtypes)))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, dtypes: string]
Примечание:
с использованием dtypes
вместо str(dtypes)
получит следующую схему, где _1
и _2
равны col_name
и col_dtype
соответственно:
root
|-- database: string (nullable = true)
|-- tableName: string (nullable = true)
|-- dtypes: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- _1: string (nullable = true)
| | |-- _2: string (nullable = true)
при использовании этого метода каждая таблица будет иметь только одну строку. для следующих двух методов каждый col_type таблицы будет иметь свою собственную строку.
Метод 2: используйте описать
, вы также можете получить эту информацию из запуска spark.sql("describe tableName")
, с помощью которого вы напрямую получите фрейм данных, а затем используйте функцию Reduce для объединения результатов из всех таблиц.
from functools import reduce
def get_df_dtypes(db, tb):
try:
return spark.sql('desc `{}`.`{}`'.format(db, tb)) \
.selectExpr(
'"{}" as `database`'.format(db)
, '"{}" as `tableName`'.format(tb)
, 'col_name'
, 'data_type')
except Exception, e:
print("ERROR from {}.{}: [{}]".format(db, tb, e))
pass
# an example table:
get_df_dtypes('default', 'tbl_df1').show()
+--------+---------+--------+--------------------+
|database|tableName|col_name| data_type|
+--------+---------+--------+--------------------+
| default| tbl_df1| array_b|array<struct<a:st...|
| default| tbl_df1| array_d| array<string>|
| default| tbl_df1|struct_c|struct<a:double,b...|
+--------+---------+--------+--------------------+
# use reduce function to union all tables into one df
df_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])
Метод 3: используйте spark.catalog.listColumns ()
Usespark.catalog.listColumns (), которая создает список collections.Column
объектов, извлекает name
и dataType
и объединяет данные. результирующий кадр данных нормализуется с помощью col_name и col_dtype в их собственных столбцах (так же, как при использовании Method-2 ).
data = []
DRow = Row('database', 'tableName', 'col_name', 'col_dtype')
for row in dfs.select('database', 'tableName').collect():
try:
for col in spark.catalog.listColumns(row.tableName, row.database):
data.append(DRow(row.database, row.tableName, col.name, col.dataType))
except Exception, e:
print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
pass
df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]
A Примечание: различные распределения Spark /версии могут иметь результат, отличный от describe tbl_name
и других команд при получении метаданных, убедитесь, что в запросах используются правильные имена столбцов.