Я новичок в PySpark, но мне удалось заставить работать ниже.
У меня есть еще 2 требования, оба из которых я бы выполнил с помощью оператора case в SQL.
Я попробовал следующее:
sqlfunc\
.when((df5.time_minute > 0) &(df5.time_minute < 16) , “Q1” )\
.when((df5.time_minute > 15) &(df5.time_minute < 31) , “Q2” )\
.when((df5.time_minute > 30) &(df5.time_minute < 46) , “Q3” )\
.when((df5.time_minute > 45) &(df5.time_minute < 61) , “Q4” )\
.otherwise("Unknown")\
.alias("Quarter"))
Я пытался добавить это как условие withColumn (), а также в select.но в любом случае, он не создает новый столбец с результатом в нем.
Может ли кто-нибудь посоветовать мне, как мне добавить оператор case в сценарий, чтобы выходные данные содержалиновый столбец.Как я уже сказал, я пытался - withColumn ('ColumnName', когда оператор ....) - во время выбора ('field1', 'field2', когда оператор)
Любая помощь будет большой
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
from datetime import datetime
#create a context that supports hive
def create_session(appname):
spark_session = SparkSession\
.builder\
.appName(appname)\
.master('yarn')\
.config("hive.metastore.uris", "thrift:IP:9083")\
.enableHiveSupport()\
.getOrCreate()
return spark_session
### START MAIN ###
if __name__ == '__main__':
spark_session = create_session('testing_files')
dt_now = datetime.now()
today_unixtime = long(dt_now.strftime('%s'))
today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')
twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')
hourago = long(dt_now.strftime('%s')) - 60*60*4
hrdate = datetime.fromtimestamp(hourago).strftime('%H')
schema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3', StringType(), True), \
StructField('field4',LongType(), True) \
]
final_structure = StructType(schema)
df1 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*/*/*' %today_date, final_structure)
usercatschema = [\
StructField('field1', StringType(), True),\
StructField('field2',StringType(), True), \
StructField('field3',StringType(), True) \
]
usercat_structure = StructType(usercatschema)
df2 = spark_session.read\
.option("header","false")\
.option("delimiter", "\t")\
.csv('hdfs://directory/dt=%s/*' %twoday, usercat_structure)
df3 = df2.select('field1','field2', 'field3')
df4= df1.join(df3,(df1.field1==df3.field1)&(sqlfunc.substring(df1.field2, 0, 14)==df3.field2),"left")
df5 = df4\
.coalesce(1000)\
.select('df1.field1','df2.field1', ......)\
.groupBy('field1','field2'....)\
.agg(
sqlfunc.sum(df4.field1).alias('upload'),\
sqlfunc.sum(df4.field2).alias('download'),\
sqlfunc.countDistinct(df4.field3).alias('distinct_field3'),\
sqlfunc.count(df4.field4).alias('field4')\
)\
.select('field1......)
df5.show()