UDF в Pyspark, не вызывающий ошибку модуля с именем pyspark - PullRequest
0 голосов
/ 07 апреля 2020

Надеюсь, вы поможете

Ниже, у меня есть UDF, который ничего не делает, просто возвращает то же значение, которое было введено в него. Но это не работает.

Когда я запускаю скрипт без UDF, он работает нормально,

Как только я добавляю UDF, я получаю '/ usr / bin / python: нет модуля с именем pyspark '- очевидно, он найден, потому что в противном случае остальные не запустятся.

Мне нужно, чтобы UDF работал и мог передавать два значения из DF в это.

Кто-нибудь может мне помочь, пожалуйста?

#!/usr/bin/env python
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *

#create a context that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://x.net:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('Eckoh')
    df1 = spark_session.table('design.web_copy').withColumn("fulldom", concat(col("domain"), col("url")))
    df_agg = df1.coalesce(1000)\
        .filter( (df1.fulldom.like('%facebook.com%') | df1.fulldom.like('%twitter.com%')))\
        .select('dt', 'dc', 'fulldom', 'nsvsn', 'version', 'serverport', 'userid', 'optimisedsize', 'sessionid', 'customrepgrp')\
        .withColumn("https", when(col("serverport") == "443", "HTTPS").otherwise("HTTP"))

    df2 = spark_session.table('design.radius_copy').withColumnRenamed('userid', 'userid2').withColumnRenamed('sessionid', 'sessionid2')
    df2_agg = df2.coalesce(1000).select('userid2', 'sessionid2', 'custavp1')

    conditions = [df_agg.sessionid == df2_agg.sessionid2, df_agg.userid == df2_agg.userid2]
    df3 = df_agg.join(df2_agg, conditions, how='left')

    def shield(x):

        return x

    spark_session.udf.register("saveshield", shield)
    spark_udf = sqlfunc.udf(shield, StringType())
    df4 = df3.withColumn('get_profile', spark_udf(df3['custavp1']))
    df4.show()
    df4.createOrReplaceTempView("tt")
    finaldf = spark_session.sql("create Table keenek1.dftest111 as select * from tt")
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...