Я пытаюсь внедрить K-ближайших соседей в PySpark.
У меня возникла проблема в скорости с циклом «for», перебирающим весь набор данных Test, и поэтому я попытался использовать
Функция "map" в наборе тестовых данных, выдавшая ошибку SPARK-5063, что означает, что вложенный RDD не поддерживается в Spark.
Ниже приведен код
import findspark
findspark.init()
import pyspark
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd
Функция расстояния
def distanceBetweenTuples(data1 , data2) :
squaredSum = 0.0
for i in range(len(data1)):
squaredSum = squaredSum + (data1[i] - data2[i])**2
return(squaredSum**0.5)
Функция KNN
def KNN_Classify(Train,Test,K,Test_class):
for j in range(len(Test)):
KNN_Data=[data[0] for data
in((Train.cartesian(sc.parallelize([Test[j]],1)))\
.map(lambda data : (data[0][1]
,distanceBetweenTuples(data[0][0], data[1]))))\
.takeOrdered(K, key = lambda data : data[1])]
Test_class.append(max(KNN_Data,key=KNN_Data.count))
Функция чтения данных
def read_file(path,parts=None,target_var=None):
col_names=[]
Data= pd.DataFrame()
if target_var is not None:
Data=pd.read_csv(path)
col_names = list(Data.drop([target_var,],axis=1))
return sc.parallelize(list(zip([tuple(x) for x in
Data[col_names].values], tuple(Data[target_var]))),parts)
else:
Data=pd.read_csv(path)
return [tuple(x) for x in Data.values]
Чтение данных поезда и испытаний
Train_RDD=read_file(path="Train.csv",target_var='Target',parts=1)
Test_RDD= read_file(path="Test.csv")
Первый ряд Поезда
Train_RDD.first()
((2.027175118,5.6410692529999995,10.98619243,1.1699097090000001,1.406686778),
'Class1')
Тип и первый ряд теста
print("The type of Test_RDD is",type(Test_RDD))
Test_RDD[0]
Тип Test_RDD - «список»
массив ([1.7156137, 4.39461976, 5.66608099, 1.7953647, -1.54322475])
Применение функции ко всему набору данных
T1=[]
KNN_Classify(Train=Train_RDD,Test=Test_RDD,K=3,Test_class=T1)
print(T1)
['class1', 'class1', 'class1', 'class1', 'class1', 'class1', 'class1', 'class1']
Теперь приведенный выше код работает нормально. Моя основная проблема связана с циклом for, который используется для итерации по всем строкам набора данных Test_RDD. Цикл for занимает много времени, когда у меня есть большой набор данных, например, 100K Train Строки V / S 100K Тестовые строки. Я даже пытался использовать функцию «карта», но она вызывает ошибку: SPARK-5063.
Теперь, основываясь на этом, я также исследую свойство "broadcast" в данных Train, однако оно не имеет декартовых атрибутов, а также, если данные большие, обычно это не рекомендуется из того, что я исследовал (открыт для предложений здесь).
Мои вопросы
1) Могу ли я использовать в моем текущем коде что-то кроме «for loop» для ускорения процесса? как "карта", не вызывая ошибку SPARK-5063.
2) Является ли трансляция способом решения подобных проблем?
PS: код для воспроизведения наборов данных
Train_List = [((3.09,1.97,3.73),'group1'),
((2.96,2.15,4.16),'group1'),
((2.87,1.93,4.39),'group1'),
((3.02,1.55,4.43),'group1'),
((1.80,3.65,2.08),'group2'),
((1.36,9.43,1.95),'group2'),
((1.71,7.35,1.94),'group2'),
((1.03,9.75,2.12),'group2'),
((2.30,7.59,1.99),'group2')]
Train_Data_RDD = sc.parallelize(Train_List,1)
Test_data = [(2.5, 1.7, 4.2),(8.5, 7.7, 2.1),(2.5, 1.7, 2.2)]