я написал код в последовательном режиме, используя python, и параллельную версию того же кода в спарке данных. моя параллельная реализация выглядит нормально, но перевод ее в последовательную реализацию представляет собой сложную задачу, особенно эффективность кода. Цель состоит в том, чтобы измерить выигрыш в производительности от параллельной реализации. здесь под образцом находятся данные, которые имеют 4 атрибута: идентификатор пользователя, имя, его возраст и имя друга, с которым он связан.
- это последовательные и параллельные реализации:
parallel implementation
sc = SparkContext(master='local[4]')
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
def parse(line):
fields = line.split(",")
name = (fields[1])
friends = (fields[3])
friends = 1
age = int(fields[2])
name_age = (name,age) # the name and age combination uniquely identifies individuals
return (name_age,friends)
name_age_friend =data.map(parse)
# name_age_friend is an RDD with name and age as a key and friends name is replaced with 1,
#meanining the person has one friend.
# the number of friends for a user are added and the name is removed, it is not important for further calculation
first_RDD = name_age_friend.reduceByKey(lambda x,y : x + y).map(lambda row: (row[0][0],(row[0][1],row[1]))).map(lambda x : x[1])
# the value is replaced with the number of friends and a number one ,
#means he is a single person that wil help latter to know the size of age group
Second_RDD = first_RDD.mapValues(lambda x : (x,1))
# breaking age range into age group
def age(line):
ageRange = int(line[0])
number_friends =line[1]
if ageRange in range(16,20):
ageRange ='teens'
elif ageRange in range(20,40):
ageRange = 'Adult'
elif ageRange in range(40,50):
ageRange = 'MiddleAge'
ageRange = 'old'
return (ageRange,number_friends)
third_RDD = Second_RDD.map(age)
## collected user an the same age range
fourth_RDD = third_RDD.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
age_group_average_friends = fourth_RDD.mapValues(lambda x : int(x[0]/x[1]))
и последовательная реализация
#loc the attribute or features of interest
friends = df.iloc[:,3]
ages = df.iloc[:,2]
# default of dictionary with age as key and value as a list of friends
dictionary_age_friends = defaultdict(list)
# populating the dictionary with key age and values friend
for i,j in zip(ages,friends):
print("first dict")
#second dictionary, the same age is collected and the number of friends is added
set_dict ={}
for x in dictionary_age_friends:
list_friends =[]
for y in dictionary_age_friends[x]:
set_list_len = len(list_friends) # assign a friend with a number 1
set_dict[x] = set_list_len
# set_dict ={}
# for x in dictionary_age_friends:
# print("inside the loop")
# lis_1 =[]
# for y in dictionary_age_friends[x]:
# lis_1.append(y)
# set_list = lis_1
# set_list = [1 for x in set_list] # assign a friend with a number 1
# set_dict[x] = sum(set_list)
# a dictionary that assign the age range into age-groups
second_dict = defaultdict(list)
for i,j in set_dict.items():
if i in range(16,20):
i = 'teens_youthAdult'
elif i in range(20,40):
i ="Adult"
elif i in range(40,60):
i ="MiddleAge"
elif i in range(60,72):
i = "old"
print("final dict stared")
new_dic ={}
for key,value in second_dict.items():
if key == 'teens_youthAdult':
new_dic[key] = round((sum(value)/len(value)),2)
elif key =='Adult':
new_dic[key] = round((sum(value)/len(value)),2)
elif key =='MiddleAge' :
new_dic[key] = round((sum(value)/len(value)),2)
new_dic[key] = round((sum(value)/len(value)),2)
end_time = datetime.datetime.now()
обратная связь, которую я получил от последовательной реализации,
- не уверен, что выполняет правильную задачу
- не оптимизировано
- есть много ненужных словарей и кодов
стек iam при переводе параллельной реализации в последовательную. Любое предложение или помощь высоко ценится.