Путаница в типах Spark RDD - PullRequest
       24

Путаница в типах Spark RDD

0 голосов
/ 29 января 2020

Я только изучаю Spark, начал с RDD и теперь перехожу к DataFrames. В моем текущем проекте pyspark я читаю файл S3 в RDD и выполняю некоторые простые преобразования для них. Вот код

segmentsRDD = sc.textFile(fileLocation). \
    filter(lambda line: line.split(",")[6] in INCLUDE_SITES). \
    filter(lambda line: line.split(",")[2] not in EXCLUDE_MARKETS). \
    filter(lambda line: "null" not in line). \
    map(splitComma). \
    filter(lambda line: line.split(",")[5] == '1')

SplitComma - это функция, которая выполняет некоторые вычисления даты для данных строки и возвращает обратно 10 полей, разделенных запятыми. Как только я получаю, что я запускаю последний фильтр, как показано, только для тех строк извлечения, где значение в поле [5] = 1. Пока все в порядке.

Далее, я хотел бы преобразовать сегменты RDD в DF со схемой как показано ниже.

interim_segmentsDF = segmentsRDD.map(lambda x: x.split(",")).toDF("itemid","market","itemkey","start_offset","end_offset","time_shifted","day_shifted","tmsmarketid","caption","itemstarttime")

Но я получаю сообщение об ошибке из-за невозможности преобразования «pyspark.rdd.PipelinedRDD» в DataFrame. Не могли бы вы объяснить разницу между "pyspark.rdd.PipelinedRDD" и "row RDD"? Я пытаюсь преобразовать в DF со схемой, как показано. Что мне здесь не хватает?

Спасибо

1 Ответ

0 голосов
/ 29 января 2020

Вы должны добавить следующие строки в свой код:

from pyspark.sql import SparkSession
spark = SparkSession(sc)

Метод .toDF() не является оригинальным методом rdd. Если вы посмотрите на исходный код Spark , вы увидите, что метод .toDF() представляет собой патч обезьяны .

Итак, при инициализации SparkSession вы вызываете эту обезьяну метод с использованием кэша; другими словами, когда вы запускаете rdd.toDF(), вы напрямую запускаете метод .toDF() из API Dataframe.

...