Преобразование кода API на основе RDD в код API на основе Dataframe в pyspark - PullRequest
0 голосов
/ 25 апреля 2019

Код API на основе RDI для чтения CSV-файла и преобразования его в кортежи:

# load data
movie_rating = sc.textFile('140419_Movie_Rating.csv')
# preprocess data -- only need ["userId", "movieId", "rating"]
header = movie_rating.take(1)[0]
rating_data = movie_rating \
    .filter(lambda line: line!=header) \
    .map(lambda line: line.split(",")) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), int(tokens[2]))) \
    .cache()
# check three rows
rating_data.take(3)

вывод:

[(6156680, 433441, 2), (6156680, 433400, 1), (6156680, 433400, 1)]

По сути, я читаю файл CSV с помощью API на основе RDD из pyspark.mllb, чтобы перезагрузить sdata с помощью sc.textFile и преобразовать его в форму (user_id, video_id, rating)

Теперь, если мне нужно выполнить ту же операцию, используя код API на основе Dataframe? Как этого добиться?

Ответы [ 2 ]

1 голос
/ 25 апреля 2019

API Spark Dataframe поддерживает чтение CSV-файлов с разделителем.

Давайте создадим наш CSV-файл:

import pandas as pd
pd.DataFrame([(6156680, 433441, 2), (6156680, 433400, 1), (6156680, 433400, 1)], columns=['user_id', 'video_id', 'rating']) \
    .to_csv('140419_Movie_Rating.csv', index=False)

Теперь мы можем прочитать файл с заголовком, по умолчанию используется разделитель ', ':

df = spark.read.csv('140419_Movie_Rating.csv', header=True, inferSchema=True)
df.show()
df.printSchema()

        +-------+--------+------+
        |user_id|video_id|rating|
        +-------+--------+------+
        |6156680|  433441|     2|
        |6156680|  433400|     1|
        |6156680|  433400|     1|
        +-------+--------+------+

        root
         |-- user_id: integer (nullable = true)
         |-- video_id: integer (nullable = true)
         |-- rating: integer (nullable = true)

0 голосов
/ 25 апреля 2019

Попробуйте это:

rating_data_df = spark.read.format('csv')\
    .option('header', 'true')\
    .option('inferSchema', 'true')\
    .load('140419_Movie_Rating.csv')
rating_data_df.take(3)

В вашем случае это должно вывести что-то вроде этого:

[Row(userId=6156680, movieId=433441, rating=2), Row(userId=6156680, movieId=433400, rating=1), Row(userId=6156680, movieId=433400, rating=1)]

Подробнее об этих универсальных функциях можно прочитать здесь: https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...