Запросы с использованием Dataframes - найти 10 наиболее распространенных маршрутов - PullRequest
0 голосов
/ 28 декабря 2018

У меня есть школьная работа на основе этой проблемы веб-сайт - мы используем набор данных, доступный здесь.Я пытаюсь сделать следующее: Найти 10 самых популярных маршрутов для каждого периода в один час, для каждого дня недели.Выходные данные должны быть: день недели, час, [route1,…, route10].Мой код:

from pyspark.sql import *
from pyspark.sql.types import * 
from pyspark.sql.functions import * 

import datetime  
import time 

start_time = time.time() 
spark = SparkSession.builder.master('local[*]').appName('taxis').getOrCreate() sc = spark.sparkContext sc.setLogLevel("ERROR")

timeformat = "yyyy-MM-dd HH:mm:ss" 
dateformat = "EEEE"

lat = 41.474937   #first cell is (1,1) 
long = -74.913585 
south = 0.004491556 
east = 0.005986

try :
    lines = sc.textFile('sorted_data.csv')
    taxisRows = lines.filter( lambda line : len(line) > 0 )   \
                        .map( lambda line : line.split(',') ) \
                        .filter( lambda split_line : (float(split_line[6]) != 0) \
                                                    and (float(split_line[7]) != 0) \
                                                    and (float(split_line[8]) != 0) \
                                                    and (float(split_line[9]) != 0)) \
                        .map( lambda arr : Row(pickup_datetime = arr[2], dropoff_datetime = arr[3], \
                                                pickup_longitude = (float(arr[6]) - long), \
                                                pickup_latitude = (float(arr[7]) - lat), \
                                                dropoff_longitude = (float(arr[8]) - long), \
                                                dropoff_latitude = (float(arr[9]) - lat), \
                                                )) 

    taxisRowsDF = spark.createDataFrame( taxisRows )

    taxisRowsDF = taxisRowsDF.withColumn('route', struct( struct((round((abs(taxisRowsDF.pickup_latitude)/south)+1)), (round((abs(taxisRowsDF.pickup_longitude)/east)+1))) , \
                                                        struct((round((abs(taxisRowsDF.dropoff_latitude)/south)+1)), (round((abs(taxisRowsDF.dropoff_longitude)/east)+1))) ) )


    taxisRowsDF = taxisRowsDF.withColumn("weekday",date_format('pickup_datetime', format= 'E'))
    taxisRowsDF = taxisRowsDF.withColumn("hour", date_format("pickup_datetime", format = 'H'))

    routesFrequencyDF = taxisRowsDF.groupBy('weekday', 'hour', 'route').count().orderBy('count',ascending = False)
    tenMostFrequent = routesFrequencyDF.groupBy('weekday', 'hour').agg(collect_set('route').alias('List of Routes')).show()

    #tenMostFrequent1 = tenMostFrequent.select('List of Routes', size('List of Routes').alias('Number of Routes'))

    #tenMostFrequent.show(tenMostFrequent.count, False)
    #tenMostFrequent.show(10)
#     taxisRowsDF.show(10)                
#     routesFrequencyDF.show(10)
    print("---%s seconds---"% (time.time()-start_time))
    sc.stop() except Exception as e:
    print(e)
    sc.stop()

У меня есть частота каждого маршрута с routeFrequencyDF, и с помощью agg (colect_set ()) я могу создать набор значений (так, чтобы на выходе был список из10 самых частых маршрутов для каждой недели и часа), хотя я не могу объединить эти две части информации вместе.У кого-нибудь есть предложения?

...