Замените значение ячейки в соответствии со значением ячейки в кадре данных - PullRequest
0 голосов
/ 08 октября 2019

Сначала я хотел бы заявить, что я не могу использовать панд. То, что я пытаюсь сделать, это заменить значение ячейки информационного кадра, когда значение ячейки соответствует определенному ранее определенному значению. В противном случае оставьте значение ячейки таким, каким оно было изначально.

Вот что я пробовал до сих пор:

    predictions = crossval.fit(trainingData).transform(trainingData)
    bins = predictions.select("prediction").distinct().collect()
    for row in bins:
        rows = predictions.select(["features", "prediction"]).filter(predictions.prediction == row.prediction).withColumnRenamed("prediction", "prediction_1")
        dt_model = dt.fit(rows)
        dt_transform = dt_model.transform(testData).select("prediction")
        predictions = predictions.withColumn("prediction", when(predictions.prediction == rows.prediction_1, dt_transform.prediction).otherwise(predictions.prediction))

Строка, которая доставляет мне неприятности:

predictions = predictions.withColumn("prediction", when(predictions.prediction == rows.prediction_1, dt_transform.prediction).otherwise(predictions.prediction))

Ошибка, которую он мне дает, это:

Traceback (most recent call last):
  File "part2.py", line 114, in <module>
    main()
  File "part2.py", line 108, in main
    predictions = predictions.withColumn("prediction", when(predictions.prediction == rows.prediction_1, dt_transform.prediction).otherwise(predictions.prediction))
  File "/opt/spark/python/pyspark/sql/dataframe.py", line 1990, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
  File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/opt/spark/python/pyspark/sql/utils.py", line 69, in deco
    raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u'Resolved attribute(s) prediction#3065,prediction_1#2949 missing from features#200,trip_duration#20,prediction#2925 in operator !Project [features#200, trip_duration#20, CASE WHEN (prediction#2925 = prediction_1#2949) THEN prediction#3065 ELSE prediction#2925 END AS prediction#3070]. Attribute(s) with the same name appear in the operation: prediction. Please check if the right attribute(s) are used.;;\n!Project [features#200, trip_duration#20, CASE WHEN (prediction#2925 = prediction_1#2949) THEN prediction#3065 ELSE prediction#2925 END AS prediction#3070]\n+- Project [features#200, trip_duration#20, UDF(features#200) AS prediction#2925]\n   +- Sample 0.0, 0.8, false, 3709578444707833222\n      +- Sort [features#200 ASC NULLS FIRST, trip_duration#20 ASC NULLS FIRST], false\n         +- Project [features#200, trip_duration#20]\n            +- Project [vendor_id#11, passenger_count#14, store_and_fwd_flag#178, trip_duration#20, distance#33, year#98, month#107, day#117, hour#128, minute#140, second#153, UDF(named_struct(vendor_id_double_VectorAssembler_42efd84316ac, cast(vendor_id#11 as double), passenger_count_double_VectorAssembler_42efd84316ac, cast(passenger_count#14 as double), store_and_fwd_flag_double_VectorAssembler_42efd84316ac, cast(store_and_fwd_flag#178 as double), distance_double_VectorAssembler_42efd84316ac, cast(distance#33 as double), year_double_VectorAssembler_42efd84316ac, cast(year#98 as double), month_double_VectorAssembler_42efd84316ac, cast(month#107 as double), day_double_VectorAssembler_42efd84316ac, cast(day#117 as double), hour_double_VectorAssembler_42efd84316ac, cast(hour#128 as double), minute_double_VectorAssembler_42efd84316ac, cast(minute#140 as double), second_double_VectorAssembler_42efd84316ac, cast(second#153 as double))) AS features#200]\n               +- Project [vendor_id#11, passenger_count#14, <lambda>(store_and_fwd_flag#19) AS store_and_fwd_flag#178, trip_duration#20, distance#33, year#98, month#107, day#117, hour#128, minute#140, second#153]\n                  +- Project [vendor_id#11, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, month#107, day#117, hour#128, minute#140, second#153]\n                     +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, month#107, day#117, hour#128, minute#140, <lambda>(pickup_datetime#12) AS second#153]\n                        +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, month#107, day#117, hour#128, <lambda>(pickup_datetime#12) AS minute#140]\n                           +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, month#107, day#117, <lambda>(pickup_datetime#12) AS hour#128]\n                              +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, month#107, <lambda>(pickup_datetime#12) AS day#117]\n                                 +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, year#98, <lambda>(pickup_datetime#12) AS month#107]\n                                    +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33, <lambda>(pickup_datetime#12) AS year#98]\n                                       +- Project [vendor_id#11, pickup_datetime#12, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                          +- Project [vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                             +- Project [vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, dropoff_latitude#18, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                                +- Project [vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, dropoff_longitude#17, dropoff_latitude#18, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                                   +- Project [vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, pickup_latitude#16, dropoff_longitude#17, dropoff_latitude#18, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                                      +- Project [vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, pickup_longitude#15, pickup_latitude#16, dropoff_longitude#17, dropoff_latitude#18, store_and_fwd_flag#19, trip_duration#20, distance#33]\n                                                         +- Project [id#10, vendor_id#11, pickup_datetime#12, dropoff_datetime#13, passenger_count#14, pickup_longitude#15, pickup_latitude#16, dropoff_longitude#17, dropoff_latitude#18, store_and_fwd_flag#19, trip_duration#20, <lambda>(pickup_longitude#15, pickup_latitude#16, dropoff_longitude#17, dropoff_latitude#18) AS distance#33]\n                                                            +- Relation[id#10,vendor_id#11,pickup_datetime#12,dropoff_datetime#13,passenger_count#14,pickup_longitude#15,pickup_latitude#16,dropoff_longitude#17,dropoff_latitude#18,store_and_fwd_flag#19,trip_duration#20] csv\n'

Пока я понял, что я заменяю rows.prediction_1 и dt_transform.prediction на Foretions.prediction, тогда это работает. Просто не так, как должно. Так что с этими двумя кадрами данных что-то не так.

Вывод функции предсказаний.show ():

+--------------------+-------------+------------------+
|            features|trip_duration|        prediction|
+--------------------+-------------+------------------+
|[1.0,0.0,0.0,0.0,...|            8| 299.6655053883315|
|[1.0,0.0,0.0,0.02...|            9| 299.6655053883315|
|[1.0,0.0,0.0,15.1...|         2251|2659.7614115841966|
|[1.0,1.0,0.0,0.0,...|           37| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|         1084| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          570| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          599| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           21| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|            6| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           19| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          177| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           44| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           35| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           60| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           79| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           73| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          705| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          580| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|           67| 299.6655053883315|
|[1.0,1.0,0.0,0.0,...|          640| 299.6655053883315|
+--------------------+-------------+------------------+

1 Ответ

1 голос
/ 12 октября 2019

Примечание 1: dt_transform = dt_model.transform(testData).select("prediction") не имеет особого смысла из-за несоответствия testData и строк количества строк. Вы не сможете переназначить testData новые прогнозы обратно на строки прогнозы в следующей строке с функцией when, потому что она работает строка за строкой. Функция join была бы лучшим выбором.

Примечание 2: predictions = predictions.withColumn("prediction", when(predictions.prediction == rows.prediction_1, dt_transform.prediction).otherwise(predictions.prediction)) недопустимо. Для этой операции у вас не может быть более одного фрейма данных (у вас было три - прогноза , строк , dt_transform ). Если вы хотите взять или сравнить значения в других фреймах данных, вы можете использовать функцию join.

Вот небольшой пример, который я создал, чтобы помочь вам с вашим двухэтапным методом оценки.

Этап 1 - выполнить оценку по всем данным, чтобы получить предварительные прогнозы.

Этап 2 - организовать данные в подгруппы (группировать по предварительным прогнозам), переоценить и обновить прогнозы.

Примечание: я демонстрирую использование классификации, однако мой пример может быть адаптирован для вашего случая регрессии.

Код

from pyspark.sql.types import StructField, StructType, DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pyspark.sql.functions as F

#create a sample data frame
data = [(1.54,3.45,2.56),(9.39,8.31,1.34),(1.25,3.31,9.87),(9.35,5.67,2.49),\
        (1.23,4.67,8.91),(3.56,9.08,7.45),(6.43,2.23,1.19),(7.89,5.32,9.08)]

fields = [StructField('a', DoubleType(),True),
          StructField('b', DoubleType(),True),
          StructField('c', DoubleType(),True)
         ]

schema = StructType(fields)

df = spark.createDataFrame(data, schema)

df.show()

# +----+----+----+
# |   a|   b|   c|
# +----+----+----+
# |1.54|3.45|2.56|
# |9.39|8.31|1.34|
# |1.25|3.31|9.87|
# |9.35|5.67|2.49|
# |1.23|4.67|8.91|
# |3.56|9.08|7.45|
# |6.43|2.23|1.19|
# |7.89|5.32|9.08|
# +----+----+----+

#Stage 1

assembler = VectorAssembler(inputCols=['a','b','c'],outputCol='features')

df_trans = assembler.transform(df)

kmeans = KMeans(k=3, seed=123)

km_model = kmeans.fit(df_trans)

predictions = km_model.transform(df_trans)

predictions.orderBy('prediction').show()

# +----+----+----+----------------+----------+
# |   a|   b|   c|        features|prediction|
# +----+----+----+----------------+----------+
# |1.25|3.31|9.87|[1.25,3.31,9.87]|         0|
# |1.23|4.67|8.91|[1.23,4.67,8.91]|         0|
# |3.56|9.08|7.45|[3.56,9.08,7.45]|         0|
# |7.89|5.32|9.08|[7.89,5.32,9.08]|         0|
# |9.39|8.31|1.34|[9.39,8.31,1.34]|         1|
# |9.35|5.67|2.49|[9.35,5.67,2.49]|         1|
# |1.54|3.45|2.56|[1.54,3.45,2.56]|         2|
# |6.43|2.23|1.19|[6.43,2.23,1.19]|         2|
# +----+----+----+----------------+----------+

# Stage 2
bins = predictions.select("prediction").distinct().collect()

count = 0
for row in bins:
    count+=1
    #create a sub dataframe for each unique prediction and re-estimate
    sub_df = (predictions.filter(F.col('prediction')==row.prediction)
              .select(['features','prediction'])
              .withColumnRenamed('prediction','previous_prediction')
              )
    sub_model = kmeans.fit(sub_df)
    sub_predictions = sub_model.transform(sub_df)

    #initialize if it is the first loop iteration, otherwise merge (union) rows
    if count==1:
        updated_predictions = sub_predictions
    else:
        updated_predictions = updated_predictions.union(sub_predictions)

Вывод

updated_predictions.orderBy('previous_prediction').withColumnRenamed('prediction','updated_prediction').show()

# +----------------+-------------------+------------------+
# |        features|previous_prediction|updated_prediction|
# +----------------+-------------------+------------------+
# |[1.25,3.31,9.87]|                  0|                 1|
# |[1.23,4.67,8.91]|                  0|                 1|
# |[3.56,9.08,7.45]|                  0|                 0|
# |[7.89,5.32,9.08]|                  0|                 2|
# |[9.39,8.31,1.34]|                  1|                 0|
# |[9.35,5.67,2.49]|                  1|                 1|
# |[1.54,3.45,2.56]|                  2|                 0|
# |[6.43,2.23,1.19]|                  2|                 1|
# +----------------+-------------------+------------------+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...