Spark: чтение файлов CSV из списка путей в строке DataFrame - PullRequest
0 голосов
/ 09 ноября 2018

У меня есть Spark DataFrame следующим образом:

# ---------------------------------
# - column 1 - ...  -   column 5  -
# ---------------------------------
# - ...             - Array of paths

Столбцы с 1 по 4 содержат строки, а пятый столбец содержит список строк, которые на самом деле являются путями к CSV-файлам, которые я хочу прочитать как Spark Dataframes. Я не могу найти, чтобы прочитать их. Вот упрощенная версия с одним столбцом и столбцом со списком путей:

from pyspark.sql import SparkSession,Row

spark = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

simpleRDD = spark.sparkContext.parallelize(range(10))
simpleRDD = simpleRDD.map(lambda x: Row(**{'a':x,'paths':['{}_{}.csv'.format(y**2,y+1) for y in range(x+1)]}))

simpleDF = spark.createDataFrame(simpleRDD)
print(simpleDF.head(5))

Это дает:

[Row(a=0, paths=['0_1.csv']),  
 Row(a=1, paths=['0_1.csv', '1_2.csv']),  
 Row(a=2, paths=['0_1.csv', '1_2.csv', '4_3.csv']),  
 Row(a=3, paths=['0_1.csv', '1_2.csv', '4_3.csv', '9_4.csv']),  
 Row(a=4, paths=['0_1.csv', '1_2.csv', '4_3.csv', '9_4.csv', '16_5.csv'])]

Я бы тогда хотел сделать что-то вроде этого:

simpleDF = simpleDF.withColumn('data',spark.read.csv(simpleDF.paths))

... но это, конечно, не работает.

Ответы [ 2 ]

0 голосов
/ 11 ноября 2018

Я не уверен, как вы собираетесь хранить DataFrame объекты, как только вы прочитаете их по их пути, но если получит доступ к значениям в вашем DataFrame столбце, вы можно использовать метод .collect(), чтобы вернуть DataFrame в виде списка Row объектов (как RDD).

Каждый объект Row имеет метод .asDict(), который преобразует его в объект Python dictionary. Оказавшись там, вы можете получить доступ к значениям, проиндексировав словарь по его ключу.

Предполагая, что вы храните возвращенный DataFrames в списке, вы можете попробовать следующее:

# collect the DataFrame into a list of Rows
rows = simpleRDD.collect()

# collect all the values in your `paths` column
# (note that this will return a list of lists)

paths = map(lambda row: row.asDict().get('paths'), rows)

# flatten the list of lists
paths_flat = [path for path_list in paths for path in path_list]

# get the unique set of paths 
paths_unique = list(set(paths_flat))

# instantiate an empty dictionary in which to collect DataFrames

dfs_dict = []
for path in paths_unique:
    dfs_dict[path] = spark.read.csv(path)

Ваш dfs_dict теперь будет содержать все ваши DataFrames. Чтобы получить DataFrame определенного пути, вы можете получить к нему доступ, используя путь в качестве словарного ключа:

df_0_01 = dfs_dict['0_1.csv']
0 голосов
/ 10 ноября 2018
from pyspark.sql import SparkSession,Row

from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .appName('test') \
        .getOrCreate()

inp=[['a','b','c','d',['abc\t1.txt','abc\t2.txt','abc\t3.txt','abc\t4.txt','abc\t5.txt',]],
            ['f','g','h','i',['def\t1.txt','def\t2.txt','def\t3.txt','def\t4.txt','def\t5.txt',]],
            ['k','l','m','n',['ghi\t1.txt','ghi\t2.txt','ghi\t3.txt','ghi\t4.txt','ghi\t5.txt',]]
           ]

inp_data=spark.sparkContext.parallelize(inp)

##Defining the schema

schema = StructType([StructField('field1',StringType(),True),
                      StructField('field2',StringType(),True),
                      StructField('field3',StringType(),True),
                      StructField('field4',StringType(),True),
                      StructField('field5',ArrayType(StringType(),True))
                     ])

## Create the Data frames

dataframe=spark.createDataFrame(inp_data,schema)
dataframe.createOrReplaceTempView("dataframe")
dataframe.select("field5").filter("field1='a'").show()
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...