Надеюсь, вы поможете
Ниже, у меня есть UDF, который ничего не делает, просто возвращает то же значение, которое было введено в него. Но это не работает.
Когда я запускаю скрипт без UDF, он работает нормально,
Как только я добавляю UDF, я получаю '/ usr / bin / python: нет модуля с именем pyspark '- очевидно, он найден, потому что в противном случае остальные не запустятся.
Мне нужно, чтобы UDF работал и мог передавать два значения из DF в это.
Кто-нибудь может мне помочь, пожалуйста?
#!/usr/bin/env python
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
import argparse, sys
from pyspark.sql import *
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import udf
from pyspark.sql.types import *
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift://x.net:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('Eckoh')
df1 = spark_session.table('design.web_copy').withColumn("fulldom", concat(col("domain"), col("url")))
df_agg = df1.coalesce(1000)\
.filter( (df1.fulldom.like('%facebook.com%') | df1.fulldom.like('%twitter.com%')))\
.select('dt', 'dc', 'fulldom', 'nsvsn', 'version', 'serverport', 'userid', 'optimisedsize', 'sessionid', 'customrepgrp')\
.withColumn("https", when(col("serverport") == "443", "HTTPS").otherwise("HTTP"))
df2 = spark_session.table('design.radius_copy').withColumnRenamed('userid', 'userid2').withColumnRenamed('sessionid', 'sessionid2')
df2_agg = df2.coalesce(1000).select('userid2', 'sessionid2', 'custavp1')
conditions = [df_agg.sessionid == df2_agg.sessionid2, df_agg.userid == df2_agg.userid2]
df3 = df_agg.join(df2_agg, conditions, how='left')
def shield(x):
return x
spark_session.udf.register("saveshield", shield)
spark_udf = sqlfunc.udf(shield, StringType())
df4 = df3.withColumn('get_profile', spark_udf(df3['custavp1']))
df4.show()
df4.createOrReplaceTempView("tt")
finaldf = spark_session.sql("create Table keenek1.dftest111 as select * from tt")