объединение двух искровых фреймов данных в одну схему с использованием python - PullRequest
0 голосов
/ 29 января 2020

У меня есть два разных фрейма данных pyspark, которые нужно объединить в один. Существует некоторая логика c, которая должна быть закодирована для объединения. Один из фреймов данных имеет следующую схему: (id, type, count), а другой - схему: (id, timestamp, test1, test2, test3)

Первый фрейм данных создается с помощью sql группировать запрос. Могут быть повторяющиеся идентификаторы, но тип будет отличаться для идентификаторов. И есть связанный счетчик для данного типа.

В окончательной схеме (объединенной) будут разные столбцы для счетчика типов. Данные подсчета извлекаются из первой схемы.

Пример окончательной схемы: (id, отметка времени, test1, test2, test3, type1count, type2count, type3count)

Теперь я использую два цикла for для построения словаря , У меня есть пустая схема, и я использую словарь для обновления схемы. Если я делаю это таким образом, я на самом деле не использую функции искры.

schema1: (id, type, count) -- type has the values type1, type2, type3
schema2: (id, timestamp, test1, test2, test3)
finalschema: (id, timestamp, test1, test2, test3, type1count, type2count, type3count)

Кто-нибудь есть какие-либо предложения о том, как это можно улучшить?

Большое спасибо заранее.

Ответы [ 2 ]

2 голосов
/ 29 января 2020

Вы можете использовать функцию Pyspark pivot , чтобы повернуть первый фрейм данных перед тем, как присоединить его ко второму

Рабочий пример:

import pyspark.sql.functions as F
import pyspark.sql.functions as F
df = spark.createDataFrame([[1,'type1',10],
                            [1,'type2',10],
                            [1,'type3',10]],
                           schema=['id','type','quantity'])

df = df.groupBy('id').pivot('type').sum('quantity')
display(df)

Вы можете изменить агрегация по вашему желанию.

0 голосов
/ 29 января 2020

Вы можете объединить два столбца данных в столбце идентификатора, ниже приведен пример фрагмента кода для того же самого,

df1 schema is (id, type, count).
df2 schema is (id, timestamp, test1, test2, test3, type1count, type2count, type3count)

merged_df = df1.join(df2, on=['id'], how='left_outer')

Надеюсь, это поможет.

Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...