Загрузка меток (pandas фреймов данных) в фрейм данных pyspark - PullRequest
0 голосов
/ 28 марта 2020

У меня есть много маленьких файлов рассола, которые содержат pd.Dataframe() объекты. Мне нужно загрузить их в dataprame pyspark. Во-первых, я не могу найти способ загружать файлы PickPack напрямую из adls / dbfs в фрейм данных pyspark, используя df = spark.read.format. Поскольку я не смог найти такой способ, я решил загрузить файлы, используя pickle.load из каталога dbfs, использовать pd.concat, чтобы объединить все мои объекты df, а затем преобразовать pandas dataframe в pyspakr dataframe. Но это очень неэффективно, есть ли другой способ сделать это?

Пока у меня есть:

# list of file names
pickle_list = [f.name for f in dbutils.fs.ls(dbfs_path)]
df_list = list()
t0 = time.time()
# load dataframes and append to a list
for p in pickle_list:
  df_temp = pickle.load(open('/dbfs/monitoring_prickles/' + p, 'rb'))
  df_list.append(df_temp)
t1 = time.time() - t0

final_df = pd.concat(df_list)

Я определяю следующую схему ( Преобразование Pandas кадра данных в Spark ошибка кадра данных ) и конвертируйте так:

from pyspark.sql.types import *

# Auxiliar functions
def equivalent_type(f):
    if f == 'datetime64[ns]': return DateType()
    elif f == 'int64': return LongType()
    elif f == 'int32': return IntegerType()
    elif f == 'float64': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)


# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)

final_sdf = pandas_to_spark(final_df)

Я получаю предупреждение:

/databricks/spark/python/pyspark/sql/session.py:776: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.enabled' is set to true; however, failed by the reason below:
  Expected a string or bytes object, got a 'int' object
Attempting non-optimization as 'spark.sql.execution.arrow.fallback.enabled' is set to true.
  warnings.warn(msg)

Несколько вопросов: (1) есть ли лучший способ сделать это так как я очень новичок в Databricks / PySpark? (2) почему display(final_sdf) занимает больше времени, чем просто выполнение 'final_df', и (3) что такое оптимизация стрелок и предупреждение? Спасибо!

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...