PySpark - добавление нового столбца в DataFrame - PullRequest
0 голосов
/ 25 февраля 2019

Мне нужно проверить каждый столбец фрейма данных с ожидаемой длиной или нет.Если нет, необходимо добавить один новый столбец во фрейм данных, который заполняет результаты проверки.Мне нужно, чтобы это было написано с использованием Python.

Например, ниже приведена структура моего фрейма данных.И мои критерии проверки: общая длина любых столбцов не должна превышать 3 *.

[col1   col2    col3
=========================
AAA     BBB     CCC
DDDD    EEE     BBBB
AAA     EEEE    BBBB

И я ожидаю вывод, как показано ниже.

col1    col2    col3                   length_check
======================================================================================
AAA     BBB     CCC      
DDDD    EEE     BBBB     Expected Length of col1 is 3; Expected Length of col3 is 3
AAA     EEEE    BBBB     Expected Length of col2 is 3; Expected Length of col3 is 3

Ваш вклад очень важен.Спасибо

Код:

valid_rdd=parsed_file.map(lambda line: line if len(line)==4 else False)
                     .filter(lambda line:line!=False)
invalid_rdd=parsed_file.map(lambda line: line if len(line)!=4 else False)
                       .filter(lambda line:line!=False) 
columns=['colA','colB','colC','colD'] 
df_valid=valid_rdd.toDF(columns) 
df1=df_valid.withColumn('length_check', (f.when (f.length('colA')== 1, "True").otherwise("Expected Column length 1 but found "+ str(f.length('colA') + str(df_valid.colA) ))))

1 Ответ

0 голосов
/ 25 февраля 2019

Ну, во-первых, я не очень разбираюсь в PySpark, также у меня мало опыта работы с Python, но я смог скопировать ваш код (с небольшим рефакторингом).

Прежде чем я покажу вамкод, я думаю, важно упомянуть, что приведенный ниже код основан на этой статье , пытающейся воспроизвести сценарий реального мира (я полагаю, вам удастся получить все необходимые данные, чтобы он работал).

# Assuming this is the returned value from the Data Frame (e.g.parsed_file.columns)
DATA_FRAME_COLUMNS = [
    'Col1',
    'Col2',
    'Col3',
]


def get_rows_from_a_column(column):
     # This dictionary is actually a mock of what should be something like parsed_file.select(column).rows
     return {
         'Col1': ['AAA', 'BBB', 'CCC'],
         'Col2': ['DDDD', 'EEE', 'BBBB'],
         'Col3': ['AAA', 'EEEE', 'BBBB']
     }[column]


def check_column_length(data_frame_columns):
    """
    This function checks for the length of rows that you have accordingly to 
    DATA_FRAME_COLUMNS
    """
    data = {}

    for column in data_frame_columns:
        column_rows = get_rows_from_a_column(column)

        if len(column_rows) != 3:
            raise ValueError('Expected length of ' + column + ' to be 3')

        data[column] = column_rows

    return data


print(check_column_length(DATA_FRAME_COLUMNS)) # {'Col2': ['DDDD', 'EEE', 'BBBB'], 'Col3': ['AAA', 'EEEE', 'BBBB'], 'Col1': ['AAA', 'BBB', 'CCC']}

Если мой совет чего-то стоит, я бы порекомендовал вам взглянуть на Zen of Python , я не знаю вашего уровня с языком (но япредположу, что вы все равно слышали об этом).

Почему я это говорю?Я думаю, что лямбда-функции хороши в некоторых случаях, но они обычно идут вразрез с принципом * 1012.

Кстати, этот материал относится к вашему общему коду, как он выглядит и насколько легко его понять.Лично я бы не использовал его (но это зависит от ситуации).

PS: Я не был уверен до обновления @ barbsan в этом посте, если вы хотите проверить длину строк столбца илидлина символов в строке.Поэтому я предполагаю, что вам нужен первый вариант.

Дайте мне знать, если я что-то пропустил.

...