Использование оператора when в pyspark - не работает, когда я добавляю в различные части скрипта - PullRequest
0 голосов
/ 21 сентября 2018

Я новичок в PySpark, но мне удалось заставить работать ниже.

У меня есть еще 2 требования, оба из которых я бы выполнил с помощью оператора case в SQL.

Я попробовал следующее:

sqlfunc\
.when((df5.time_minute > 0) &(df5.time_minute < 16) , “Q1” )\
.when((df5.time_minute > 15) &(df5.time_minute < 31) , “Q2” )\
.when((df5.time_minute > 30) &(df5.time_minute < 46) , “Q3” )\
.when((df5.time_minute > 45) &(df5.time_minute < 61) , “Q4” )\
.otherwise("Unknown")\
.alias("Quarter"))

Я пытался добавить это как условие withColumn (), а также в select.но в любом случае, он не создает новый столбец с результатом в нем.

Может ли кто-нибудь посоветовать мне, как мне добавить оператор case в сценарий, чтобы выходные данные содержалиновый столбец.Как я уже сказал, я пытался - withColumn ('ColumnName', когда оператор ....) - во время выбора ('field1', 'field2', когда оператор)

Любая помощь будет большой

from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
from datetime import datetime
#create a context that supports hive
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift:IP:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session

### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    dt_now = datetime.now()

    today_unixtime = long(dt_now.strftime('%s'))
    today_date = datetime.fromtimestamp(today_unixtime).strftime('%Y%m%d')

    twoday_unixtime = long(dt_now.strftime('%s')) - 24*60*60*2
    twoday = datetime.fromtimestamp(twoday_unixtime).strftime('%Y%m%d')

    hourago = long(dt_now.strftime('%s')) - 60*60*4
    hrdate = datetime.fromtimestamp(hourago).strftime('%H')

    schema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3', StringType(), True), \
        StructField('field4',LongType(), True) \
        ]
    final_structure = StructType(schema)

    df1 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://directory/dt=%s/*/*/*' %today_date, final_structure)

    usercatschema = [\
        StructField('field1', StringType(), True),\
        StructField('field2',StringType(), True), \
        StructField('field3',StringType(), True) \
        ]
    usercat_structure = StructType(usercatschema)

    df2 = spark_session.read\
        .option("header","false")\
        .option("delimiter", "\t")\
        .csv('hdfs://directory/dt=%s/*' %twoday, usercat_structure)


    df3 = df2.select('field1','field2', 'field3')
    df4= df1.join(df3,(df1.field1==df3.field1)&(sqlfunc.substring(df1.field2, 0, 14)==df3.field2),"left")

    df5 = df4\
        .coalesce(1000)\
        .select('df1.field1','df2.field1', ......)\
        .groupBy('field1','field2'....)\
        .agg(
            sqlfunc.sum(df4.field1).alias('upload'),\
            sqlfunc.sum(df4.field2).alias('download'),\
            sqlfunc.countDistinct(df4.field3).alias('distinct_field3'),\
            sqlfunc.count(df4.field4).alias('field4')\
            )\
        .select('field1......)

    df5.show()

1 Ответ

0 голосов
/ 23 сентября 2018

Вот рабочий скрипт:

df5 = df4\
.coalesce(1000)\
.withColumn('quarter',\
sqlfunc.when((df4.time_minute >-1 ) & (df4.time_minute < 16), 1)\
.when((df4.time_minute >15 ) & (df4.time_minute < 31), 2)\
.when((df4.time_minute >30 ) & (df4.time_minute < 46), 3)\
.when((df4.time_minute >45 ) & (df4.time_minute < 61), 4)\
.otherwise(5))\
.select('field1','field2', 'date', 'time_hour', 'time_minute')\
.groupBy('date', 'time_hour', 'quarter')\
.agg(
sqlfunc.sum(df4.field1).alias('sumfield1'),\
sqlfunc.sum(df4.field2).alias('sumfield2'),\
)\
.select('date', 'time_hour', 'quarter', 'sumfield1', 'sumfield2')
df5.show()
...