Проблема с PySpark DataFrame - PullRequest
0 голосов
/ 07 января 2020

При использовании pyspark у меня возникают проблемы с моим фреймом данных, который не сгруппирован так, как мне бы хотелось. В приведенном ниже примере в столбце ANALYTIC хотелось бы, чтобы значения были различны, чтобы значения трендов можно было видеть по месяцам. Как я могу выполнить sh это?

 ***df Dataframe consists following***
-----'df' DATAFRAME CONSISTS OF FOLLOWING ----


+-----------+----+-----+------+-------------------+
|CLIENT_NAME|YEAR|MONTH|ENGINE|TOTAL_UNIQUE_MEMBER|
+-----------+----+-----+------+-------------------+
|   Paax    |2019|   12|  ERG2|             435911|
|   Paax    |2019|   11|   ELE|             435911|
|   Paax    |2019|   11|   PHA|             435911|
|   Paax    |2019|   12|   ELE|             435911|
|   Paax    |2019|   12|   EBM|             512518|
|   Paax    |2019|   12|   PHA|             435911|
+-----------+----+-----+------+-------------------+


I m taking above values and keeping in dictionary 
and getted those values from dictionary and 
assigned to 'list of tuple' and finally tuple is added to some columns i.e Dataframe

Я пытался:

import os
import glob
import pandas as pd
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()


path = "/Users/ash2/Desktop/new_results/prodd"
mon_dict = {'01':'Jan','02':'Feb','03':'Mar','04':'Apr','05':'May','06':'Jun','07':'Jul','08':'Aug','09':'Sep','10':'Oct','11':'Nov','12':'Dec'}

def get_list_dirs(path):
        lst = os.listdir(path)
        if '.DS_Store' in lst:
            lst.remove('.DS_Store')
        return lst
for i in get_list_dirs(path):
    output_path = "/Users/ash2/Desktop"+os.sep+"out"
    #below taking part files which consists of rows and columns and values seperated by delimeter
    all_filenames =  glob.glob(path + os.sep + i + os.sep + '2019' + os.sep + '*' +os.sep + 'uniqueMemberReport' +os.sep + 'part*')
    df = spark.read.format("csv").option("header", "true").option('delimiter', '|').load(all_filenames)
    tup = []
    l =[]
    #df.show()
    df.persist()
    for i in range(1,df.count()+1):
        k = df.take(i)
        d = k[i-1].asDict()
        client = d['CLIENT_NAME']
        month = d['MONTH']
        anlytic = d['ENGINE']
        count = d['TOTAL_UNIQUE_MEMBER']
        y_m = mon_dict[month] + ' - 2019'
        l.append(anlytic)
        l1 = list(dict.fromkeys(l))
        if(month == '12') :
            tup.append((anlytic,'','','','','','','','','','','',count)) 
        if(month == '11' and anlytic in l1) :
            tup.append((anlytic,'','','','','','','','','','',count,''))

    #tup.append(('','','','','','','','','','','','',''))
strong text
    df_text = spark.createDataFrame(tup, ['ANALYTIC','JAN','FEB','MAR','APR','MAY','JUN','JUL','AUG','SEP','OCT','NOV','DEC'])
    print(df_text.show()) [Code Image][1]

Мой вывод

+--------+---+---+---+---+---+---+---+---+---+---+------+------+
|ANALYTIC|JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|   NOV|   DEC|
+--------+---+---+---+---+---+---+---+---+---+---+------+------+
|    ERG2|   |   |   |   |   |   |   |   |   |   |      |435911|
|     ELE|   |   |   |   |   |   |   |   |   |   |435911|      |
|     PHA|   |   |   |   |   |   |   |   |   |   |435911|      |
|     ELE|   |   |   |   |   |   |   |   |   |   |      |435911|
|     EBM|   |   |   |   |   |   |   |   |   |   |      |512518|
|     PHA|   |   |   |   |   |   |   |   |   |   |      |435911|
+--------+---+---+---+---+---+---+---+---+---+---+------+------+

Ожидаемый результат

--------+---+---+---+---+---+---+---+---+---+---+------+------+
|ANALYTIC|JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|   NOV|   DEC|
+--------+---+---+---+---+---+---+---+---+---+---+------+------+
|    ERG2|   |   |   |   |   |   |   |   |   |   |      |435911|
|     ELE|   |   |   |   |   |   |   |   |   |   |435911|435911|
|     PHA|   |   |   |   |   |   |   |   |   |   |435911|435911|
|     EBM|   |   |   |   |   |   |   |   |   |   |      |512518|
+--------+---+---+---+---+---+---+---+---+---+---+------+------+

' Не получается ожидаемый результат, как показано выше

Ответы [ 2 ]

0 голосов
/ 08 января 2020

Вам нужно pivot на информационном кадре, попробуйте это:

df.groupby('ENGINE').pivot("MONTH").agg(F.max('TOTAL')).show()
0 голосов
/ 08 января 2020

Не уверен, что это должно делать: l1 = list(dict.fromkeys(l)), но этот блок кода добавляет две строки в список кортежей, если выполнены условия для одного и того же аналитика c:

if(month == '12') :
        tup.append((anlytic,'','','','','','','','','','','',count)) 
if(month == '11' and anlytic in l1) :
        tup.append((anlytic,'','','','','','','','','','',count,''))

Попробуйте добавить значения DE C и NOV в один и тот же кортеж. Также может помочь использование NamedTuples.

if(month == '12') :
        (analytic)+('',)*8+(count)
        tup.append((anlytic,'','','','','','','','','','','',count)) 
if(month == '11' and anlytic in l1) :
        tup.append((anlytic,'','','','','','','','','','',count,tup[analytic_index][-1]))
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...