Аргумент должен быть строкой или числом, а не 'Type' - Pyspark - PullRequest
0 голосов
/ 11 февраля 2020

Обновление: Итак, я изучил проблему, проблема в потоке данных scikit-multiflow. в последней четверти кода stream_clf.partial_fit(X,y, classes=stream.target_values) здесь значение класса для stream.target_values ​​должно быть числом или строкой, но метод возвращает (dtype). Когда я печатаю или l oop stream.target_values, я получаю это: enter image description here

Я пытался сделать преобразование et c. но все равно бесполезно. Может кто-нибудь, пожалуйста, помогите здесь?

Начальная проблема

Я запускаю код (вдохновил здесь ). Он отлично работает, когда используется ванильная среда python.

Но если я запускаю этот код после определенной модификации в Apache Spark с использованием Pyspark, я получаю следующую ошибку

TypeError: int() argument must be a string, a bytes-like object or a number, not 'type'

Я испробовал все возможные способы проследить проблему, но все выглядит хорошо. Ошибка возникает из последней строки кода, где дерево подсказок вызывается для предсказания. Он ожидает ndarray, а тип переменной X также ndarray. Я не уверен, что вызывает проблему. Может кто-нибудь помочь или направить меня к правильному следу?

полный стек ошибок:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-52-1310132c88db> in <module>
     30         D3_win.addInstance(X,y)
     31         xx = np.array(X,dtype='float64')
---> 32         y_hat = stream_clf.predict(xx)
     33 
     34 

~/conceptDrift/projectTest/lib/python3.5/site-packages/skmultiflow/trees/hoeffding_tree.py in predict(self, X)
   1068         r, _ = get_dimensions(X)
   1069         predictions = []
-> 1070         y_proba = self.predict_proba(X)
   1071         for i in range(r):
   1072             index = np.argmax(y_proba[i])

~/conceptDrift/projectTest/lib/python3.5/site-packages/skmultiflow/trees/hoeffding_tree.py in predict_proba(self, X)
   1099                     votes = normalize_values_in_dict(votes, inplace=False)
   1100                 if self.classes is not None:
-> 1101                     y_proba = np.zeros(int(max(self.classes)) + 1)
   1102                 else:
   1103                     y_proba = np.zeros(int(max(votes.keys())) + 1)

TypeError: int() argument must be a string, a bytes-like object or a number, not 'type'

Код

import findspark
findspark.init()
import pyspark as ps
import warnings
from pyspark.sql import functions as fn
import sys
from pyspark import SparkContext,SparkConf
import pandas as pd  
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score as AUC
from sklearn.preprocessing import MinMaxScaler
import matplotlib.pyplot as plt
from skmultiflow.trees.hoeffding_tree import HoeffdingTree
from skmultiflow.data.data_stream import DataStream
import time
def drift_detector(S,T,threshold = 0.75):
    T = pd.DataFrame(T)
    #print(T)
    S = pd.DataFrame(S)
    # Give slack variable in_target which is 1 for old and 0 for new
    T['in_target'] = 0 # in target set
    S['in_target'] = 1 # in source set

    # Combine source and target with new slack variable
    ST = pd.concat( [T, S], ignore_index=True, axis=0)
    labels = ST['in_target'].values
    ST = ST.drop('in_target', axis=1).values
    # You can use any classifier for this step. We advise it to be a simple one as we want to see whether source
    # and target differ not to classify them.
    clf = LogisticRegression(solver='liblinear')
    predictions = np.zeros(labels.shape)
    # Divide ST into two equal chunks
    # Train LR on a chunk and classify the other chunk
    # Calculate AUC for original labels (in_target) and predicted ones
    skf = StratifiedKFold(n_splits=2, shuffle=True)
    for train_idx, test_idx in skf.split(ST, labels):
        X_train, X_test = ST[train_idx], ST[test_idx]
        y_train, y_test = labels[train_idx], labels[test_idx]
        clf.fit(X_train, y_train)
        probs = clf.predict_proba(X_test)[:, 1]
        predictions[test_idx] = probs
    auc_score = AUC(labels, predictions)
    print(auc_score)
    # Signal drift if AUC is larger than the threshold
    if auc_score > threshold:
        return True
    else:
        return False
class D3():
    def __init__(self, w, rho, dim, auc):
        self.size = int(w*(1+rho))
        self.win_data = np.zeros((self.size,dim))
        self.win_label = np.zeros(self.size)
        self.w = w
        self.rho = rho
        self.dim = dim
        self.auc = auc
        self.drift_count = 0
        self.window_index = 0
    def addInstance(self,X,y):
        if(self.isEmpty()):
            self.win_data[self.window_index] = X
            self.win_label[self.window_index] = y
            self.window_index = self.window_index + 1
        else:
            print("Error: Buffer is full!")
    def isEmpty(self):
        return self.window_index < self.size
    def driftCheck(self):
        if drift_detector(self.win_data[:self.w], self.win_data[self.w:self.size], auc): #returns true if drift is detected
            self.window_index = int(self.w * self.rho)
            self.win_data = np.roll(self.win_data, -1*self.w, axis=0)
            self.win_label = np.roll(self.win_label, -1*self.w, axis=0)
            self.drift_count = self.drift_count + 1
            return True
        else:
            self.window_index = self.w
            self.win_data = np.roll(self.win_data, -1*(int(self.w*self.rho)), axis=0)
            self.win_label =np.roll(self.win_label, -1*(int(self.w*self.rho)), axis=0)
            return False
    def getCurrentData(self):
        return self.win_data[:self.window_index]
    def getCurrentLabels(self):
        return self.win_label[:self.window_index]

def select_data(x):
    x = "/user/hadoop1/tellus/sea_1.csv"
    peopleDF = spark.read.csv(x, header= True)
    df = peopleDF.toPandas()
    scaler = MinMaxScaler()
    df.iloc[:,0:df.shape[1]-1] = scaler.fit_transform(df.iloc[:,0:df.shape[1]-1])
    return df
def check_true(y,y_hat):
    if(y==y_hat):
        return 1
    else:
        return 0
df = select_data("/user/hadoop1/tellus/sea_1.csv")
stream = DataStream(df)
stream.prepare_for_use()
stream_clf = HoeffdingTree()
w = int(2000)
rho = float(0.4)
auc = float(0.60)


# In[ ]:


D3_win = D3(w,rho,stream.n_features,auc)
stream_acc = []
stream_record = []
stream_true= 0

i=0
start = time.time()
X,y = stream.next_sample(int(w*rho))
stream_clf.partial_fit(X,y, classes=stream.target_values)
while(stream.has_more_samples()):
    X,y = stream.next_sample()
    if D3_win.isEmpty():
        D3_win.addInstance(X,y)
        y_hat = stream_clf.predict(X)

1 Ответ

1 голос
/ 11 февраля 2020

Проблема была с функцией select_data (), тип данных переменных изменялся во время выполнения. Эта проблема исправлена.

...