python: разбить среднее количество друзей по возрастным группам - PullRequest
0 голосов
/ 17 января 2019

я написал код в последовательном режиме, используя python, и параллельную версию того же кода в спарке данных. моя параллельная реализация выглядит нормально, но перевод ее в последовательную реализацию представляет собой сложную задачу, особенно эффективность кода. Цель состоит в том, чтобы измерить выигрыш в производительности от параллельной реализации. здесь под образцом находятся данные, которые имеют 4 атрибута: идентификатор пользователя, имя, его возраст и имя друга, с которым он связан.

enter image description here

- это последовательные и параллельные реализации:

 parallel implementation
         sc = SparkContext(master='local[4]')
         from pyspark.sql import SQLContext
         sqlContext = SQLContext(sc)
    data=sc.textFile('C:/Users/abanfo/Desktop/Assignment_parralel/testing_data.csv')
    def parse(line):
        fields = line.split(",")
        name = (fields[1])
        friends = (fields[3])
        friends = 1
        age = int(fields[2])
        name_age = (name,age) # the name and age combination uniquely identifies individuals
        return (name_age,friends)
    name_age_friend =data.map(parse)
    # name_age_friend is an RDD with name and age as a key and friends name is replaced with 1, 
    #meanining the person has one friend.
    print(name_age_friend.take(30))
# the number of friends for a user are added and the name is removed, it is not important for further calculation
first_RDD = name_age_friend.reduceByKey(lambda x,y : x + y).map(lambda row: (row[0][0],(row[0][1],row[1]))).map(lambda x : x[1])
print(first_RDD.take(30))
# the value is replaced with the number of friends and a number one ,
#means he is a single person that wil help latter to know the size of age group 
Second_RDD = first_RDD.mapValues(lambda x : (x,1))
print(Second_RDD.take(30))
# breaking age range into age group
def age(line):
    ageRange = int(line[0])
    number_friends =line[1] 
    if ageRange in range(16,20):
        ageRange ='teens'        
    elif ageRange in range(20,40):
        ageRange = 'Adult'
    elif ageRange in range(40,50):
        ageRange = 'MiddleAge'
    else:
        ageRange = 'old'        
    return (ageRange,number_friends)
third_RDD = Second_RDD.map(age)
print(third_RDD.take(30))
## collected user an the same age range
fourth_RDD = third_RDD.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
print(fourth_RDD.take(30))
age_group_average_friends = fourth_RDD.mapValues(lambda x : int(x[0]/x[1]))
print(age_group_average_friends.take(5))

и последовательная реализация

#loc the attribute or features of interest
friends = df.iloc[:,3]
ages = df.iloc[:,2]

# default of dictionary with age as key and value as a list of friends 
dictionary_age_friends = defaultdict(list)

# populating the dictionary with key age and values friend
for i,j in zip(ages,friends):
    dictionary_age_friends[i].append(j)
print("first dict")
print(dictionary_age_friends)

#second dictionary, the same age is collected and the number of friends is added 
set_dict ={}
for x in dictionary_age_friends:
    list_friends =[]
    for y in dictionary_age_friends[x]:
        list_friends.append(y)
    set_list_len = len(list_friends) # assign a friend with a number 1
    set_dict[x] = set_list_len
print(set_dict)

# set_dict ={}
# for x in dictionary_age_friends:
#     print("inside the loop")
#     lis_1 =[]
#     for y in dictionary_age_friends[x]:
#         lis_1.append(y)
#         set_list = lis_1
#         set_list = [1 for x in set_list] # assign a friend with a number 1
#         set_dict[x] = sum(set_list)

# a dictionary that assign the age range into age-groups
second_dict = defaultdict(list)
for i,j in set_dict.items(): 
    if i in range(16,20):           
        i = 'teens_youthAdult'
        second_dict[i].append(j)
    elif i in range(20,40):       
        i ="Adult"
        second_dict[i].append(j)
    elif i in  range(40,60):        
        i ="MiddleAge"
        second_dict[i].append(j)
    elif i in range(60,72):       
        i = "old"
        second_dict[i].append(j)
print(second_dict)
print("final dict stared")
new_dic ={}

for key,value in second_dict.items():
    if key == 'teens_youthAdult':
        new_dic[key] = round((sum(value)/len(value)),2)
    elif key =='Adult':
        new_dic[key] = round((sum(value)/len(value)),2)
    elif key =='MiddleAge' :
        new_dic[key] = round((sum(value)/len(value)),2)
    else:
        new_dic[key] = round((sum(value)/len(value)),2)
new_dic
end_time = datetime.datetime.now()


print(end_time-start_time)


print(new_dic)

обратная связь, которую я получил от последовательной реализации,

  1. не уверен, что выполняет правильную задачу
  2. не оптимизировано
  3. есть много ненужных словарей и кодов

стек iam при переводе параллельной реализации в последовательную. Любое предложение или помощь высоко ценится.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...