PySpark: данные не всегда соответствуют схеме - логика для изменения данных - PullRequest
0 голосов
/ 20 сентября 2018

Я новичок в PySpark и работаю над сценарием, читая из .csv файлов.

Я четко определил схему ниже, и сценарий работает отлично ... большинство извремя.

Проблема в том, что иногда в файлы попадает значение, которое не соответствует схеме - например, «-» может появиться в целочисленном поле и, следовательно, мы получаем ошибку типа - ошибкаВыдается при достижении df1.show() в сценарии.

Я пытаюсь придумать способ эффективно сказать - если значение не соответствует определенному типу данных, то заменить на ''

Кто-нибудь знает, возможно ли это?Любой совет был бы великолепен!

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
#create a context that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://serverip:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    dt_now = datetime.now()

    today_unixtime = long(dt_now.strftime('%s'))
    today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')

    twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
    twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')

    hourago = long(dt_now.strftime('%s')) - 60*60*4
    hrdate = datetime.fromtimestamp(hourago).strftime('%H')

    schema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',IntegerType(), True) \
        ]
    final_structure = StructType(schema)

    df1 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/dt=%s/*/*/*' %today_date, final_structure)

    usercatschema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',StringType(), True) \
        ]
    usercat_structure = StructType(usercatschema)

    df2 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://hdfspath/v0/dt=%s/*' %twoday, usercat_structure)

    df1.show()
    df2.show()
    df1.createOrReplaceTempView("dpi")
    df2.createOrReplaceTempView("usercat")

    finaldf = spark_session.sql('''
    SQL QUERY
''')
    finaldf.coalesce(10).write.format("com.databricks.spark.csv").option("header", "true").option('sep', '\t').mode('append').save('hdfs://hdfs path')

1 Ответ

0 голосов
/ 20 сентября 2018

Прочитайте его как строковый тип, а затем преобразуйте в int.

df.withColumn("field3",df.field3.cast("int"))
...