У меня есть много маленьких файлов рассола, которые содержат pd.Dataframe()
объекты. Мне нужно загрузить их в dataprame pyspark. Во-первых, я не могу найти способ загружать файлы PickPack напрямую из adls / dbfs в фрейм данных pyspark, используя df = spark.read.format
. Поскольку я не смог найти такой способ, я решил загрузить файлы, используя pickle.load
из каталога dbfs, использовать pd.concat
, чтобы объединить все мои объекты df, а затем преобразовать pandas dataframe в pyspakr dataframe. Но это очень неэффективно, есть ли другой способ сделать это?
Пока у меня есть:
# list of file names
pickle_list = [f.name for f in dbutils.fs.ls(dbfs_path)]
df_list = list()
t0 = time.time()
# load dataframes and append to a list
for p in pickle_list:
df_temp = pickle.load(open('/dbfs/monitoring_prickles/' + p, 'rb'))
df_list.append(df_temp)
t1 = time.time() - t0
final_df = pd.concat(df_list)
Я определяю следующую схему ( Преобразование Pandas кадра данных в Spark ошибка кадра данных ) и конвертируйте так:
from pyspark.sql.types import *
# Auxiliar functions
def equivalent_type(f):
if f == 'datetime64[ns]': return DateType()
elif f == 'int64': return LongType()
elif f == 'int32': return IntegerType()
elif f == 'float64': return FloatType()
else: return StringType()
def define_structure(string, format_type):
try: typo = equivalent_type(format_type)
except: typo = StringType()
return StructField(string, typo)
# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
columns = list(pandas_df.columns)
types = list(pandas_df.dtypes)
struct_list = []
for column, typo in zip(columns, types):
struct_list.append(define_structure(column, typo))
p_schema = StructType(struct_list)
return sqlContext.createDataFrame(pandas_df, p_schema)
final_sdf = pandas_to_spark(final_df)
Я получаю предупреждение:
/databricks/spark/python/pyspark/sql/session.py:776: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
Expected a string or bytes object, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
warnings.warn(msg)
Несколько вопросов: (1) есть ли лучший способ сделать это так как я очень новичок в Databricks / PySpark? (2) почему display(final_sdf)
занимает больше времени, чем просто выполнение 'final_df
', и (3) что такое оптимизация стрелок и предупреждение? Спасибо!