Я пытаюсь изучить Spark, поэтому я совершенно новичок в этом деле.
У меня есть файл с тысячами строк, каждая из которых имеет следующую структуру:
LFPG;EDDW;00;E170;370;LFPG;EDDW;189930555;150907;1826;!!!!;AFR1724;AFR;0;AFR1724-LFPG-EDDW-20150907180000;N;0;;;245382;;;150907;1800;0;;X;;;;;;;;;;370;;0;20150907175700;AA45458743;;;;;NEXE;NEXE;;;;;20150907180000;;;;245382;;;;;;;;;;;;;;;;;;;;;;;;;;;;AFR;;;;;;;;;;;0
линия выше представляет информацию о рейсе с самолета, он вылетел из LFPG (1-й элемент) и приземлился в EDDW (2-й элемент), остальная информация не имеет отношения к этой цели.
Я хотел бы распечатайте или сохраните в файле десять самых загруженных аэропортов, основываясь на общем количестве перемещений самолетов, то есть самолетов, которые взлетели или приземлились в аэропорту.
Таким образом, в некотором смысле желаемый результат будет:
AIRPORT_NAME #TOTAL_MOVEMENTS #TAKE-OFFs #LANDINGS
Я уже реализовал эту программу в python и хотел бы преобразовать ее, используя парадигму MAP / Reduce, используя Spark.
# Libraries
import sys
from collections import Counter
import collections
from itertools import chain
from collections import defaultdict
# START
# Defining default program argument
if len(sys.argv)==1:
fileName = "airports.exp2"
else:
fileName = sys.argv[1]
takeOffAirport = []
landingAirport = []
# Reading file
lines = 0 # Counter for file lines
try:
with open(fileName) as file:
for line in file:
words = line.split(';')
# Relevant data, item1 and item2 from each file line
origin = words[0]
destination = words[1]
# Populating lists
landingAirport.append(destination)
takeOffAirport.append(origin)
except IOError:
print ("\n\033[0;31mIoError: could not open the file:\033[00m %s" %fileName)
airports_dict = defaultdict(list)
# Merge lists into a dictionary key:value
for key, value in chain(Counter(takeOffAirport).items(),
Counter(landingAirport).items()):
# 'AIRPOT_NAME':[num_takeOffs, num_landings]
airports_dict[key].append(value)
# Sum key values and add it as another value
for key, value in airports_dict.items():
#'AIRPOT_NAME':[num_totalMovements, num_takeOffs, num_landings]
airports_dict[key] = [sum(value),value]
# Sort dictionary by the top 10 total movements
airports_dict = sorted(airports_dict.items(),
key=lambda kv:kv[1], reverse=True)[:10]
airports_dict = collections.OrderedDict(airports_dict)
# Print results
print("\nAIRPORT"+ "\t\t#TOTAL_MOVEMENTS"+ "\t#TAKEOFFS"+ "\t#LANDINGS")
for k in airports_dict:
print(k,"\t\t", airports_dict[k][0],
"\t\t\t", airports_dict[k][1][1],
"\t\t", airports_dict[k][1][0])
Тестовый файл можно загрузить с : https://srv-file7.gofile.io/download/YCnWxr/traffic1day.exp2
До сих пор я был в состоянии получить самые первые и вторые элементы из файла, но я не очень хорошо знаю, как реализовать фильтр или уменьшить чтобы получить частоту т То есть, что каждый аэропорт появляется в каждом списке, а затем объединяет оба списка, добавляя название аэропорта, сумму взлетов и посадок и количество взлетов и посадок.
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
conf = SparkConf().setAppName("airports").setMaster("local[*]")
sc = SparkContext(conf = conf)
airports = sc.textFile("traffic1hour.exp2", minPartitions=4)
airports = airports.map(lambda line : line.split('\n'))
takeOff_airports = airports.map(lambda sub: (sub[0].split(';')[0]))
landing_airports = airports.map(lambda sub: (sub[0].split(';')[1]))
takeOff_airports.saveAsTextFile("takeOff_airports.txt")
landing_airports.saveAsTextFile("landing_airport.txt")
Любая подсказка или руководство будет очень цениться .