Преобразование RDD в DataFrame с помощью Pyspark возвращает Dataframe со всеми нулями - PullRequest
0 голосов
/ 14 апреля 2020

Я пытаюсь преобразовать значение ключа RDD в DataFrame Pyspark с помощью метода sqlContext.createDataFrame (). Если я не объявляю схему явно, я получаю сообщение об ошибке, потому что некоторые столбцы из RDD имеют нулевые значения.

Вот что я запускаю в оболочке Pyspark. Все работает как положено, пока я не попытаюсь взять Pandas фрейм данных, который у меня есть в Pyspark, и попытаться преобразовать его в Spark Dataframe.

import parser
import pandas as pd
import flat_table
import itertools
import importlib
import pathlib
import re
import os

from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.functions import array, col, explode, struct, lit


URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration

subdirectory = "/path/to/directory"

fs = FileSystem.get(URI("hdfs://localfilesystem:port"), Configuration())
status = fs.globStatus(Path(subdirectory))

def fuse_sections(document_data: dict) -> DataFrame:
    all_section_data = pd.concat(document_data)
    return all_section_data

declared_schema = StructType([StructField('name', BinaryType(), True), 
StructField('date', BinaryType(), True)])

for fileStatus in status:
    pathname = str(fileStatus.getPath())
    xml_rdd = sc.wholeTextFiles(pathname, use_unicode=False)
    tree_rdd = xml_rdd.mapValues(parser)
    if tree_rdd.isEmpty() == True:
        pass
    else:
        parsed_rdd = data_dict_rdd.mapValues(fuse_sections)
        values_rdd = parsed_rdd.values()

        #The following print statement shows that the values_rdd is compiling as expected
        print(values_rdd.collect())
        spark_df = sqlContext.createDataFrame(values_rdd, declared_schema)

        #The following dataframe shows all nulls
        spark_df.show(200)

Является ли тот факт, что я пытаюсь go из Pandas Dataframe в RDD в Spark Dataframe, где он портится и приводит ко всем нулям?

...