pyspark zipWithIndex () to DataFrame - тип столбца изменен - PullRequest
0 голосов
/ 07 ноября 2018

У меня есть Spark DataFrame. Ниже приведен код для генерации примера кадра данных.

     arr = np.array([
['b5ad805c-f295-4852-82fc-961a88',12732936],
['0FD6955D-484C-4FC8-8C3F-DA7D28','Gklb38'],
['0E3D17EA-BEEF-4931-8104',12909841],
['CC2877D0-A15C-4C0A-AD65-762A35C1','12645715'],
['CC2877D0-A15C-4C0A-AD65-762A35C1',12909837],
['6AC9C45D-A891-4BEA-92B1-04224E9C65ED', '12894376'],
['CFF7BAB7-C5E1-490D-B257-AE58CA071362', 'Gklb38' ]])

df_purchases = pd.DataFrame(arr, columns = ['user_id','basket'])
df_spark = spark.createDataFrame(df_purchases)
df_spark.show()

В попытке создать индексы для каждого уникального идентификатора продукта (корзины) я использовал zipWithIndex()

products_only = spark_df[['basket']]
products_df = products_only.distinct()
indexed_products = products_df.rdd.zipWithIndex()

А потом я преобразовал обратно в тип DataFrame:

# convert to spark data frame
products_ind_df = indexed_products.toDF(["product_id", "index"])

Когда я проверил тип, я обнаружил, что это:

products_ind_df.dtypes

Выход:

[('product_id', 'struct<basket:string>'), ('index', 'bigint')]

тогда как:

products_df.dtypes

Выходы:

 [('basket', 'string')]

Мой вопрос, почему тип не:

[('product_id', 'string'), ('index', 'bigint')]

а как поменять его на строку?

1 Ответ

0 голосов
/ 07 ноября 2018

Поскольку products_df.rdd является СДР объекта Row, вам необходимо сначала извлечь basket из каждой строки в виде строки:

products_df.rdd.map(lambda r: r.basket).zipWithIndex().toDF(['product_id', 'index'])
# DataFrame[product_id: string, index: bigint]

Кроме того, если вам просто нужно сопоставить каждый идентификатор продукта с целым числом, вы также можете использовать модуль StringIndexer из ml.feature:

from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import col

stringIndexer = StringIndexer(inputCol="basket", outputCol="index")
model = stringIndexer.fit(df_spark)
df_spark_index = model.transform(df_spark).withColumn("index", col("index").cast("int"))

df_spark_index.show()
+--------------------+--------+-----+
|             user_id|  basket|index|
+--------------------+--------+-----+
|b5ad805c-f295-485...|12732936|    2|
|0FD6955D-484C-4FC...|  Gklb38|    0|
|0E3D17EA-BEEF-493...|12909841|    1|
|CC2877D0-A15C-4C0...|12645715|    5|
|CC2877D0-A15C-4C0...|12909837|    3|
|6AC9C45D-A891-4BE...|12894376|    4|
|CFF7BAB7-C5E1-490...|  Gklb38|    0|
+--------------------+--------+-----+
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...