Как пройти / перебрать Dataframe с помощью pyspark? - PullRequest
0 голосов
/ 12 марта 2020

Я новичок ie для pyspark. Вот моя схема, полученная от mongodb. df.printSchema ()

root
 |-- machine_id: string (nullable = true)
 |-- profiles: struct (nullable = true)
 |    |-- node_a: struct (nullable = true)
 |    |    |-- profile_1: struct (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- log_count: string (nullable = true)
 |    |    |    |-- log_att: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- count: string (nullable = true)
 |    |    |    |    |    |-- log_content: string (nullable = true)
 |    |    |-- profile_2: struct (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- log_count: string (nullable = true)
 |    |    |    |-- log_att: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- count: string (nullable = true)
 |    |    |    |    |    |-- log_content: string (nullable = true)
 |    |    |-- profile_3: struct (nullable = true)
 |    |    |-- profile_4: struct (nullable = true)
 |    |    |-- ...
 |    |-- node_b: struct (nullable = true)
 |    |    |-- profile_1: struct (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- log_count: string (nullable = true)
 |    |    |    |-- log_att: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- count: string (nullable = true)
 |    |    |    |    |    |-- log_content: string (nullable = true)
 |    |    |-- profile_2: struct (nullable = true)
 |    |    |    |-- duration: string (nullable = true)
 |    |    |    |-- log_count: string (nullable = true)
 |    |    |    |-- log_att: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- count: string (nullable = true)
 |    |    |    |    |    |-- log_content: string (nullable = true)
 |    |    |-- profile_3: struct (nullable = true)
 |    |    |-- profile_4: struct (nullable = true)
 |    |    |-- ...

Для каждой машины у меня есть 2 узла, для каждого узла у меня много профилей. Мне нужно получить распределение продолжительности для каждого профиля. Например, для profile_1, count (1 <= duration <2). Какие API-интерфейсы dataframe я могу использовать? Все, о чем я думал, это: 1. выровнять node_a и node_b new_df = df.selectExpr (flatten (df.schema, None, 2)) 2. получить новые фреймы данных для node_a и node_b df_a = new_df.selectExpr ("machine_id", "" node_a ") df_b = new_df.selectExpr (" machine_id "," node_b ") 3. Затем сгладьте df_a и df_b, чтобы я мог иметь 2 кадра данных со схемой ниже: </p>

 |-- machine_id: string (nullable = true)
 |-- profile_1: struct (nullable = true)
 |    |-- duration: string (nullable = true)
 |    |-- log_count: string (nullable = true)
 |    |-- log_att: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- count: string (nullable = true)
 |    |    |    |-- log_content: string (nullable = true)
 |-- profile_2: struct (nullable = true)
 |    |-- duration: string (nullable = true)
 |    |-- log_count: string (nullable = true)
 |    |-- log_att: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- count: string (nullable = true)
 |    |    |    |-- log_content: string (nullable = true)
 |-- profile_3: struct (nullable = true)
 |-- profile_4: struct (nullable = true)
 |-- ...

Я рассматриваю это как очень глупый метод. Есть ли другой "умнее" метод?

1 Ответ

0 голосов
/ 12 марта 2020

Ах ... Я наконец-то нашел новый метод решения этой проблемы. Не уверен, что это хороший способ, но он, безусловно, лучше глупого

def flatten(schema, prefix=None):
    for field in schema.fields:
        dtype = field.dataType
        field_name = field.name
        name = prefix + '.' + field_name if prefix else field_name
        if field_name == "profiles" \
            or re.search(r'machine_[ab]', field_name \
            or re.match(r'profile_\d+', field_name)):
            flatten(dtype, prefix=name)
        elif re.search(r'profile_\d+', name):
            for sub_name in dtype.names:
                sub_names.append(name + '.' + sub_name)
            print(sub_names)
            create_new_table(sub_names)
    return
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...