повреждение l oop для фрейма данных в PySpark - PullRequest
0 голосов
/ 10 июля 2020

Я делаю al oop для повреждения моего набора данных в PySpark, и я хочу контролировать ошибки.
Во-первых: я составил список типа: 9 ошибок и 1 без ошибок.

#code_erreur= [ "replace","inverse","inserte","delete","espace","NA"]
code_erreur= ["replace"]
varibale =["VARIABLEA","VARIABLEB"]

Я начинаю с кода ошибки: заменить.
Выберите случайную букву из случайного «varibale», чтобы заменить ее другой случайной буквой.

Мой ввод:
VARIABLEA | VARIABLEB
СИНИЙ | БЕЛЫЙ
РОЗОВЫЙ | ТЕМНЫЙ


Мой ожидаемый результат:
VARIABLEA | VARIABLEB
BLTE | БЕЛЫЙ
РОЗОВЫЙ | DARM


И я сделал al oop:

def algo_corruption(lettre,code_erreur,nombre_erreur,varibale,data): 
  alp=(list(string.ascii_uppercase))  
  table_corruption=[]
  for i in range(1,data.count()):
    
    code_erreur_choisi =random.choice(code_erreur)
    varibale_choisie =random.choice(varibale)
  
    (table_corruption.append((code_erreur_choisi,
                            varibale_choisie)))
    
  cols=["code_erreur_choisi","varibale_choisie"]
  result = spark.createDataFrame(table_corruption, cols)
  result= result.withColumn("id", monotonically_increasing_id())
  data= data.withColumn("id", monotonically_increasing_id())
  data_join_result= data.join(result, "id","inner").drop("id")

  for j in range(1,data_join_result.count()):
    if data_join_result.filter(col("code_erreur_choisi") == "replace"):
      data_corrp = (data_join_result[varibale_choisie].replace(random.choice(data_join_result.collect()[j][varibale_choisie]),
                                                               random.choice(alp))
      display(data_corrp)
    
    else:
      print("erreur pas encore codée") 
      

Но это не работает, у меня всегда возникают ошибки типа:


ValueError: R
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<command-3614001298541202> in <module>()
----> 1 algo_corruption(code_erreur,varibale,extrac_base_train)

<command-1181487391956055> in algo_corruption(code_erreur, varibale, data)
     29     if data_join_result.filter(col("code_erreur_choisi") == "replace"):
     30       data_corrp = (data_join_result[varibale_choisie].replace(random.choice(data_join_result.collect()[j][varibale_choisie]),
---> 31                                                                data_join_result.collect()[j][lettre_choisie]))
     32       display(data_corrp)
     33 

/databricks/spark/python/pyspark/sql/types.py in __getitem__(self, item)
   1517             raise KeyError(item)
   1518         except ValueError:
-> 1519             raise ValueError(item)
   1520 
   1521     def __getattr__(self, item):

ValueError: R
...