Python: Как получить нужный список? - PullRequest
0 голосов
/ 01 марта 2020

Я пытаюсь изучить Spark, поэтому я совершенно новичок в этом деле.

У меня есть файл с тысячами строк, каждая из которых имеет следующую структуру:

LFPG;EDDW;00;E170;370;LFPG;EDDW;189930555;150907;1826;!!!!;AFR1724;AFR;0;AFR1724-LFPG-EDDW-20150907180000;N;0;;;245382;;;150907;1800;0;;X;;;;;;;;;;370;;0;20150907175700;AA45458743;;;;;NEXE;NEXE;;;;;20150907180000;;;;245382;;;;;;;;;;;;;;;;;;;;;;;;;;;;AFR;;;;;;;;;;;0

линия выше представляет информацию о рейсе с самолета, он вылетел из LFPG (1-й элемент) и приземлился в EDDW (2-й элемент), остальная информация не имеет отношения к этой цели.

Я хотел бы распечатайте или сохраните в файле десять самых загруженных аэропортов, основываясь на общем количестве перемещений самолетов, то есть самолетов, которые взлетели или приземлились в аэропорту.

Таким образом, в некотором смысле желаемый результат будет:

AIRPORT_NAME    #TOTAL_MOVEMENTS    #TAKE-OFFs    #LANDINGS

Я уже реализовал эту программу в python и хотел бы преобразовать ее, используя парадигму MAP / Reduce, используя Spark.

# Libraries
import sys
from collections import Counter
import collections
from itertools import chain
from collections import defaultdict

# START

# Defining default program argument
if len(sys.argv)==1:
    fileName = "airports.exp2"
else:
    fileName = sys.argv[1]

takeOffAirport = []
landingAirport = []

# Reading file
lines = 0 # Counter for file lines
try:
    with open(fileName) as file:
        for line in file:
            words = line.split(';')
            # Relevant data, item1 and item2 from each file line
            origin = words[0]
            destination = words[1]
            # Populating lists
            landingAirport.append(destination)
            takeOffAirport.append(origin)

except IOError:
    print ("\n\033[0;31mIoError: could not open the file:\033[00m %s" %fileName)

airports_dict = defaultdict(list)

# Merge lists into a dictionary key:value
for key, value in chain(Counter(takeOffAirport).items(), 
                    Counter(landingAirport).items()):
    # 'AIRPOT_NAME':[num_takeOffs, num_landings]
    airports_dict[key].append(value) 
# Sum key values and add it as another value
for key, value in airports_dict.items():
   #'AIRPOT_NAME':[num_totalMovements, num_takeOffs, num_landings]
   airports_dict[key] = [sum(value),value]  

# Sort dictionary by the top 10  total movements
airports_dict = sorted(airports_dict.items(), 
                        key=lambda kv:kv[1], reverse=True)[:10]
airports_dict = collections.OrderedDict(airports_dict)

# Print results 
print("\nAIRPORT"+ "\t\t#TOTAL_MOVEMENTS"+ "\t#TAKEOFFS"+ "\t#LANDINGS")
for k in airports_dict:
    print(k,"\t\t", airports_dict[k][0],
          "\t\t\t", airports_dict[k][1][1],
          "\t\t", airports_dict[k][1][0])

Тестовый файл можно загрузить с : https://srv-file7.gofile.io/download/YCnWxr/traffic1day.exp2

До сих пор я был в состоянии получить самые первые и вторые элементы из файла, но я не очень хорошо знаю, как реализовать фильтр или уменьшить чтобы получить частоту т То есть, что каждый аэропорт появляется в каждом списке, а затем объединяет оба списка, добавляя название аэропорта, сумму взлетов и посадок и количество взлетов и посадок.

from pyspark import SparkContext, SparkConf


if __name__ == "__main__":
    conf = SparkConf().setAppName("airports").setMaster("local[*]")
    sc = SparkContext(conf = conf)

    airports = sc.textFile("traffic1hour.exp2", minPartitions=4)
    airports = airports.map(lambda line : line.split('\n'))

    takeOff_airports = airports.map(lambda sub: (sub[0].split(';')[0]))
    landing_airports = airports.map(lambda sub: (sub[0].split(';')[1]))

    takeOff_airports.saveAsTextFile("takeOff_airports.txt")
    landing_airports.saveAsTextFile("landing_airport.txt")

Любая подсказка или руководство будет очень цениться .

...