Конвертировать RDD в Dataframe в FPGrowth Pyspark - PullRequest
0 голосов
/ 06 июня 2018

Я обнаружил ошибку, когда сделал DataFrame из RDD.

from pyspark.ml.fpm import FPGrowth

sogou = sc.textFile("SogouQ.sample.utf8", use_unicode = False)

def parse(line):
    value = [ x for x in line.split(",") if x]
    return list(set(value))

rdd = sogou.map(parse)
df = sogou.toDF('items')

Я получаю следующую ошибку:

pyspark.sql.utils.ParseException: u "\ nmismatched input '' ожидающий {'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'МЕЖДУ', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'КОГДА', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'ЕСТЕСТВЕННЫЙ »,« ВКЛ »,« Боковой »,« ОКНО »,« СВЕРХ »,« РАЗДЕЛ »,« ДИАПАЗОН »,« СТРОКИ »,« НЕБОЛЬШОЙ »,« ПРЕДВАРИТЕЛЬНЫЙ »,« СЛЕДУЮЩИЙ »,« ТОК »,« ПЕРВЫЙ », 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO »,« DESCRIBE »,« EXPLAIN »,« FORMAT »,« LOGICAL »,« CODEGEN »,« COST »,« CAST »,« SHOW »,« TABLES »,« COLUMNS »,« COLUMN »,« USE », «PARTITIONS», «FUNCTIONS», «DROP», «UNION ',' EXCEPT ',' MINUS ',' INTERSECT ',' TO ',' TABLESAMPLE ',' STRATIFY ',' ALTER ',' RENAME ',' ARRAY ',' MAP ',' STRUCT ',' COMMENT ', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF ',' POSITION ',' DIV ',' PERCENT ',' BUCKET ',' OUT ',' OF ',' SORT ',' CLUSTER ',' DISTRIBUTE ',' OVERWRITE ',' TRANSFORM ',' REDUCE ', «SERDE», «SERDEPROPERTIES», «RECORDREADER», «RECORDWRITER», «DELIMITED», «FIELDS», «TERMINATED», «COLLECTION», «ITEMS», «KEYS», «ESCAPED», «LINES», «SEPARATED ',' FUNCTION ',' EXTENDED ',' REFRESH ',' CLEAR ',' CACHE ',' UNCACHE ',' LAZY ',' FORMATTED ',' GLOBAL ', TEMPORARY,' OPTIONS ',' UNSET ','TBLPROPERTIES ',' DBPROPERTIES ',' BUCKETS ',' SKEWED ',' STORED ',' DIRECTORIES ',' LOCATION ',' EXCHANGE ',' ARCHIVE ',' UNARCHIVE ',' FILEFORMAT ',' TOUCH ',' COMPACT ', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', База данных, Базы данных, 'DFS', 'TRUNCATE', 'ANALYZE ',' COMPUTE ',' LIST ',' STATISTICS ',' PARTITIONED ',' EXTERNAL ',' DEFINED ',' REVOKE ',' GRANT ',' LOCK ',' UNLOCK ',' MSCK ',' REPAIR ',' RECOVER ',' EXPORT ', «ИМПОРТ», «НАГРУЗКА», «РОЛЬ», «РОЛИ», «СЖАТИЯ», «ПРИНЦИПЫ», «СДЕЛКИ», «ИНДЕКС», «ИНДЕКСЫ», «ЗАМКИ», «ОПЦИЯ», «АНТИ», «LOCAL ',' INPATH ', IDENTIFIER, BACKQUOTED_IDENTIFIER} (строка 1, позиция 5) \ n \ n == SQL == \ nitems \ n ----- ^^^ \ n "

Текст содержит Chinese.Это имеет значение?Текст выглядит так:

360,安全卫士,
123,123,范冰冰,

Когда я использую pyspark.mllib.fpgrowth, rdd работает нормально.Как я могу преобразовать это в dataframe?

1 Ответ

0 голосов
/ 06 июня 2018

Здесь есть две разные проблемы:

  • toDF вызов.RDD.toDF имеет следующую подпись:

    Signature: rdd.toDF(schema=None, sampleRatio=None)
    

    , где schema должно быть

    схема параметров: pyspark.sql.types.StructType или список имен столбцов

    Итак, в вашем случае это должно быть:

    sogou.toDF(["items"])
    
  • parse метод:

    createDataFrame метод, вызываемый df ожидает RDD[tuple] или эквивалент, который может быть сопоставлен с structs, если не указана схема.Если вы хотите использовать только имя, оно должно вернуть tuple

    def parse(line):
        value = [ x for x in line.split(",") if x]
        return list(set(value)),  
    

Комбинированный:

>>> def parse(line):
...     value = [ x for x in line.split(",") if x]
...     return list(set(value)),  
... 
... 
>>> rdd = sc.parallelize(["360,安全卫士,", "123,123,范冰冰,"])
>>> rdd.map(parse).toDF(["items"]).show()
+--------------+
|         items|
+--------------+
|   [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+

Альтернатива (сохраняя текущую реализацию анализа)будет

>>> from pyspark.sql.types import ArrayType, StringType
>>> def parse(line):
...     value = [ x for x in line.split(",") if x]
...     return list(set(value))
    >>> rdd.map(parse).toDF(ArrayType(StringType())).toDF("items").show()
+--------------+     
|         items|
+--------------+
|   [安全卫士, 360]|
|[123,123,范冰冰,]|
+--------------+
...