использование UDF и simpe-фреймов данных в pyspark - PullRequest
0 голосов
/ 24 января 2020

Я новичок в pyspark и пытаюсь сделать что-то похожее на приведенное ниже: вызвать функцию PrintDetails для каждого повара ie, а затем записать результат в файл. Запрос spark. sql возвращает правильные данные, и я также могу сериализовать их в файл. Может ли кто-нибудь помочь с заявлением на каждого повара ie. Какой должен быть синтаксис для вызова UDF и как я могу записать вывод в текстовый файл?

любая помощь приветствуется. Спасибо

@udf(returnType=StringType())
def PrintDetails(cookie, timestamps,current_day, current_hourly_threshold,current_daily_threshold):
     #DO SOME WORK
     return "%s\t%d\t%d\t%d\t%d\t%s" %(some_data)

def main(argv):
    spark = SparkSession \
        .builder \
        .appName("parquet_test") \
        .config("spark.debug.maxToStringFields", "100") \
        .getOrCreate()

    inputPath = r'D:\Hadoop\Spark\parquet_input_files'
    inputFiles = os.path.join(inputPath, '*.parquet')

    impressionDate =  datetime.strptime("2019_12_31", '%Y_%m_%d')
    current_hourly_threshold = 40
    current_daily_threshold = 200

    parquetFile = spark.read.parquet(inputFiles)
    parquetFile.createOrReplaceTempView("parquetFile")
    cookie_and_time = spark.sql("SELECT cookie, collect_list(date_format(from_unixtime(ts), 'YYYY-mm-dd-H:M:S'))  as imp_times FROM parquetFile group by 1  ")

    for cookie in cookie_and_time :
        PrintDetails(cookie('cookie'), cookie('imp_times'), impressionDate, current_hourly_threshold, current_daily_threshold))

1 Ответ

0 голосов
/ 24 января 2020

Вы можете сделать, как показано ниже.

cookie_df= cookie_and_time.withColumn("cookies",PrintDetails(cookie('cookie'), cookie('imp_times'), lit(impressionDate), lit(current_hourly_threshold), lit(current_daily_threshold)))

Или вы можете определить все свои переменные в самой функции udf и избежать передачи в качестве аргументов.

...