Блоки данных Pyspark + Как вставить схему данных в виде столбца в кадре данных - PullRequest
0 голосов
/ 19 февраля 2020

У меня есть функция, которая генерирует фрейм данных:

def getdata():
    schema_1 = StructType([ StructField('path_name', StringType(), True),
                           StructField('age1', IntegerType(), True), 
                           StructField('age2', IntegerType(), True), 
                           StructField('age3', IntegerType(), True)])
    data = [('dbfs/123/sd.zip',1,2,3),('dbfs/123/ab.zip',5,6,7)]
    df = spark.createDataFrame(data,schema_1)
    return df

Мне нужно вставить эту схему фрейма данных в столбец другого фрейма данных. Результат должен выглядеть примерно так:

root
 |-- filename: string (nullable = true)
 |-- parsed: struct (nullable = true)
 |    |-- path_name: string (nullable = true)
 |    |-- age1: integer (nullable = true)
 |    |-- age2: integer (nullable = true)
 |    |-- age3: integer (nullable = true)

Я пытался сделать это с помощью udf:

@udf(schema_1)
def my_udf(schema):
  data = getdata(schema)
  List_of_rows = data.collect()  
  return List_of_rows

И затем вставить его в другой созданный мной фрейм данных. Весь код, который я использую:

from pyspark.sql.types import *
from pyspark.sql.functions import col
import pandas as pd

schema_1 = StructType([ StructField('path_name', StringType(), True),
                           StructField('age1', IntegerType(), True), 
                           StructField('age2', IntegerType(), True), 
                           StructField('age3', IntegerType(), True)])

def getdata(schema_1):    
    data = [('dbfs/123/sd.zip',1,2,3),('dbfs/123/ab.zip',5,6,7)]
    df = spark.createDataFrame(data,schema_1)
    return df


@udf(schema_1)
def my_udf(scheme):
  data = getdata(scheme)
  List_of_rows = data.collect()  
  return List_of_rows

def somefunction(schema_1):    
  pd_df = pd.DataFrame(['path'], columns = ["filename"])  
  return (spark.createDataFrame(pd_df)
          .withColumn('parsed', my_udf(schema_1))
         )

df_2 = somefunction(schema_1)

display(df_2)

Однако я получаю сообщение об ошибке,

Ошибка рендеринга вывода: Сообщить об ошибке. PicklingError: Не удалось сериализовать объект: Исключение: похоже, вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063.

, и я также думаю, что это не лучший подход. Есть идеи ??

1 Ответ

0 голосов
/ 19 февраля 2020

Разве вы не можете просто создать собственную схему? Google для этого. Также см. Пример кода, который создаст (принудительную) пользовательскую схему для вас, даже если первые строки (заголовки) отсутствуют или неверны.

from  pyspark.sql.functions import input_file_name
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("asset_id", StringType(), True), \
StructField("price_date", StringType(), True), \

etc., etc., etc., 

StructField("close_price", StringType(), True), \
StructField("filename", StringType(), True)])
...