PySpark Вложенные операции RDD - PullRequest
       6

PySpark Вложенные операции RDD

0 голосов
/ 31 октября 2018

Я пытаюсь внедрить K-ближайших соседей в PySpark. У меня возникла проблема в скорости с циклом «for», перебирающим весь набор данных Test, и поэтому я попытался использовать Функция "map" в наборе тестовых данных, выдавшая ошибку SPARK-5063, что означает, что вложенный RDD не поддерживается в Spark.

Ниже приведен код

import findspark
findspark.init()
import pyspark
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
import pandas as pd

Функция расстояния

def distanceBetweenTuples(data1 , data2) :
 squaredSum = 0.0
 for i in range(len(data1)):
     squaredSum = squaredSum + (data1[i] - data2[i])**2
 return(squaredSum**0.5)

Функция KNN

def KNN_Classify(Train,Test,K,Test_class):
    for j in range(len(Test)):    
    KNN_Data=[data[0] for data
    in((Train.cartesian(sc.parallelize([Test[j]],1)))\
                .map(lambda data : (data[0][1] 
    ,distanceBetweenTuples(data[0][0], data[1]))))\
                .takeOrdered(K, key = lambda data : data[1])] 
    Test_class.append(max(KNN_Data,key=KNN_Data.count))

Функция чтения данных

def read_file(path,parts=None,target_var=None):
    col_names=[]
    Data= pd.DataFrame()
    if target_var is not None:
       Data=pd.read_csv(path)
       col_names = list(Data.drop([target_var,],axis=1))
       return sc.parallelize(list(zip([tuple(x) for x in 
       Data[col_names].values], tuple(Data[target_var]))),parts)
    else:
       Data=pd.read_csv(path)
       return [tuple(x) for x in Data.values]

Чтение данных поезда и испытаний

Train_RDD=read_file(path="Train.csv",target_var='Target',parts=1)
Test_RDD= read_file(path="Test.csv")

Первый ряд Поезда

Train_RDD.first()

((2.027175118,5.6410692529999995,10.98619243,1.1699097090000001,1.406686778), 'Class1')

Тип и первый ряд теста

 print("The type of Test_RDD is",type(Test_RDD))
 Test_RDD[0]

Тип Test_RDD - «список»

массив ([1.7156137, 4.39461976, 5.66608099, 1.7953647, -1.54322475])

Применение функции ко всему набору данных

T1=[]
KNN_Classify(Train=Train_RDD,Test=Test_RDD,K=3,Test_class=T1)    
print(T1)

['class1', 'class1', 'class1', 'class1', 'class1', 'class1', 'class1', 'class1']

Теперь приведенный выше код работает нормально. Моя основная проблема связана с циклом for, который используется для итерации по всем строкам набора данных Test_RDD. Цикл for занимает много времени, когда у меня есть большой набор данных, например, 100K Train Строки V / S 100K Тестовые строки. Я даже пытался использовать функцию «карта», но она вызывает ошибку: SPARK-5063.

Теперь, основываясь на этом, я также исследую свойство "broadcast" в данных Train, однако оно не имеет декартовых атрибутов, а также, если данные большие, обычно это не рекомендуется из того, что я исследовал (открыт для предложений здесь).

Мои вопросы

1) Могу ли я использовать в моем текущем коде что-то кроме «for loop» для ускорения процесса? как "карта", не вызывая ошибку SPARK-5063.

2) Является ли трансляция способом решения подобных проблем?

PS: код для воспроизведения наборов данных

Train_List = [((3.09,1.97,3.73),'group1'),
                          ((2.96,2.15,4.16),'group1'),
                          ((2.87,1.93,4.39),'group1'),
                          ((3.02,1.55,4.43),'group1'),
                          ((1.80,3.65,2.08),'group2'),
                          ((1.36,9.43,1.95),'group2'),
                          ((1.71,7.35,1.94),'group2'),
                          ((1.03,9.75,2.12),'group2'),
                          ((2.30,7.59,1.99),'group2')]

Train_Data_RDD = sc.parallelize(Train_List,1)

Test_data = [(2.5, 1.7, 4.2),(8.5, 7.7, 2.1),(2.5, 1.7, 2.2)]
...