• 1000 выяснили сам подход, я изо всех сил пытаюсь реализовать его с соответствующим использованием функций карты и сокращения.
def Find_dist(x,y):
sum = 0
vec1= list(x)
vec2 = list(y)
for i in range(len(vec1)):
sum = sum +(vec1[i]-vec2[i])*(vec1[i]-vec2[i])
return sum
def mapper(cent, datapoint):
min = Find_dist(datapoint,cent[0])
closest = cent[0]
for i in range(1,len(cent)):
curr = Find_dist(datapoint,cent[i])
if curr < min:
min = curr
closest = cent[i]
yield closest,datapoint
def combine(x):
Key = x[0]
Values = x[1]
sum = [0]*len(Key)
counter = 0
for datapoint in Values:
vec = list(datapoint[0])
counter = counter+1
sum = sum+vec
point = Row(vec)
result = (counter,point)
yield Key, result
def Reducer(x):
Key = x[0]
Values = x[1]
sum = [0]*len(Key)
counter = 0
for datapoint in Values:
vec = list(datapoint[0])
counter = counter+1
sum = sum+vec
avg = [0]*len(Key)
for i in range(len(Key)):
avg[i] = sum[i]/counter
centroid = Row(avg)
yield Key, centroid
def kmeans_fit(data,k,max_iter):
centers = data.rdd.takeSample(False,k,seed=42)
for i in range(max_iter):
mapped = data.rdd.map(lambda x: mapper(centers,x))
combined = mapped.reduceByKeyLocally(lambda x: combiner(x))
reduced = combined.reduceByKey(lambda x: Reducer(x)).collect()
flag = True
for i in range(k):
if(reduced[i][1] != reduced[i][0] ):
for j in range(k):
centers[i] = reduced[i][1]
flag = False
break
if (flag):
break
return centers
data = spark.read.parquet("/mnt/ddscoursedatabricksstg/ddscoursedatabricksdata/random_data.parquet")
kmeans_fit(data,5,10)
* 1003 карты, функции reducebykeylocally и reducebykey. В настоящее время выполняется сбой при вызове reduceByKeyLocally (lambda x: combiner (x)), потому что «ValueError: недостаточно значений для распаковки (ожидалось 2, получено 1)», и мне действительно нужно, чтобы все это работало правильно в ближайшее время, поэтому, пожалуйста, кто-нибудь я хотел бы получить помощь в этом, и заранее спасибо, я буду очень благодарен за любую помощь!