__getnewargs__ ошибка при использовании udf в Pyspark - PullRequest
0 голосов
/ 12 октября 2019

Существует фрейм данных с 2 столбцами (db и tb): db обозначает базу данных, а tb обозначает tableName этой базы данных.

   +--------------------+--------------------+
   |            database|           tableName|
   +--------------------+--------------------+
   |aaaaaaaaaaaaaaaaa...|    tttttttttttttttt|
   |bbbbbbbbbbbbbbbbb...|    rrrrrrrrrrrrrrrr|
   |aaaaaaaaaaaaaaaaa...|  ssssssssssssssssss|

У меня есть следующий метод в python:

 def _get_tb_db(db, tb):
      df = spark.sql("select * from {}.{}".format(db, tb))
      return df.dtypes

и этот udf:

 test = udf(lambda db, tb: _get_tb_db(db, tb), StringType())

при выполнении этого:

 df = df.withColumn("dtype", test(col("db"), col("tb")))

возникает следующая ошибка:

 pickle.PicklingError: Could not serialize object: Py4JError: An 
 error occurred while calling o58.__getnewargs__. Trace:
 py4j.Py4JException: Method __getnewargs__([]) does not exist

Я нашел некоторое обсуждение наstackoverflow: Spark __getnewargs__ error но пока я не уверен, как решить эту проблему? Это ошибка, потому что я создаю другой фрейм данных внутри UDF?

Похоже на решение в ссылке, я попробовал это:

       cols = copy.deepcopy(df.columns)
       df = df.withColumn("dtype", scanning(cols[0], cols[1]))

, но все равно получаю ошибку

Любойрешение?

1 Ответ

1 голос
/ 13 октября 2019

Ошибка означает, что вы не можете использовать Spark dataframe в UDF. Но поскольку ваш фрейм данных, содержащий имена баз данных и таблиц, скорее всего, мал, достаточно просто взять цикл Python for, ниже приведены некоторые методы, которые могут помочь получить ваши данные:

from pyspark.sql import Row

# assume dfs is the df containing database names and table names
dfs.printSchema()
root
 |-- database: string (nullable = true)
 |-- tableName: string (nullable = true)

Method-1: используйте df.dtypes

Запустите sql select * from database.tableName limit 1, чтобы сгенерировать df и вернуть его dtypes, преобразовать его в StringType ().

data = []
DRow = Row('database', 'tableName', 'dtypes')
for row in dfs.collect():
  try:
    dtypes = spark.sql('select * from `{}`.`{}` limit 1'.format(row.database, row.tableName)).dtypes
    data.append(DRow(row.database, row.tableName, str(dtypes)))
  except Exception, e:
    print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
    pass

df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, dtypes: string]

Примечание:

  • с использованием dtypes вместо str(dtypes) получит следующую схему, где _1 и _2 равны col_name и col_dtype соответственно:

    root
     |-- database: string (nullable = true)
     |-- tableName: string (nullable = true)
     |-- dtypes: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- _1: string (nullable = true)
     |    |    |-- _2: string (nullable = true)
    
  • при использовании этого метода каждая таблица будет иметь только одну строку. для следующих двух методов каждый col_type таблицы будет иметь свою собственную строку.

Метод 2: используйте описать

, вы также можете получить эту информацию из запуска spark.sql("describe tableName"), с помощью которого вы напрямую получите фрейм данных, а затем используйте функцию Reduce для объединения результатов из всех таблиц.

from functools import reduce

def get_df_dtypes(db, tb):
  try:
    return spark.sql('desc `{}`.`{}`'.format(db, tb)) \
                .selectExpr(
                      '"{}" as `database`'.format(db)
                    , '"{}" as `tableName`'.format(tb)
                    , 'col_name'
                    , 'data_type')
  except Exception, e:
    print("ERROR from {}.{}: [{}]".format(db, tb, e))
    pass

# an example table:
get_df_dtypes('default', 'tbl_df1').show()
+--------+---------+--------+--------------------+
|database|tableName|col_name|           data_type|
+--------+---------+--------+--------------------+
| default|  tbl_df1| array_b|array<struct<a:st...|
| default|  tbl_df1| array_d|       array<string>|
| default|  tbl_df1|struct_c|struct<a:double,b...|
+--------+---------+--------+--------------------+

# use reduce function to union all tables into one df
df_dtypes = reduce(lambda d1, d2: d1.union(d2), [ get_df_dtypes(row.database, row.tableName) for row in dfs.collect() ])

Метод 3: используйте spark.catalog.listColumns ()

Usespark.catalog.listColumns (), которая создает список collections.Column объектов, извлекает name и dataType и объединяет данные. результирующий кадр данных нормализуется с помощью col_name и col_dtype в их собственных столбцах (так же, как при использовании Method-2 ).

data = []
DRow = Row('database', 'tableName', 'col_name', 'col_dtype')
for row in dfs.select('database', 'tableName').collect():
  try:
    for col in spark.catalog.listColumns(row.tableName, row.database):
      data.append(DRow(row.database, row.tableName, col.name, col.dataType))
  except Exception, e:
    print("ERROR from {}.{}: [{}]".format(row.database, row.tableName, e))
    pass

df_dtypes = spark.createDataFrame(data)
# DataFrame[database: string, tableName: string, col_name: string, col_dtype: string]

A Примечание: различные распределения Spark /версии могут иметь результат, отличный от describe tbl_name и других команд при получении метаданных, убедитесь, что в запросах используются правильные имена столбцов.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...