Перекрестное соединение между двумя фреймами данных, которое зависит от общего столбца - PullRequest
0 голосов
/ 19 апреля 2019

Кросс-джойн можно сделать следующим образом:

df1 = pd.DataFrame({'subgroup':['A','B','C','D']})
df2 = pd.DataFrame({'dates':pd.date_range(date_today, date_today + timedelta(3), freq='D')})
sdf1 = spark.createDataFrame(df1)
sdf2 = spark.createDataFrame(df2)

sdf1.crossJoin(sdf2).toPandas()

В этом примере есть два кадра данных, каждый из которых содержит 4 строки, в конце я получаю 16 строк.

Однако для моей проблемы я хотел бы сделать перекрестное объединение для пользователя, а пользователь - это еще один столбец в двух фреймах данных, например ::

.
df1 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'subgroup':['A','B','C','D','A','B','D','E']})
df2 = pd.DataFrame({'user':[1,1,1,1,2,2,2,2],'dates':np.hstack([np.array(pd.date_range(date_today, date_today + timedelta(3), freq='D')),np.array(pd.date_range(date_today+timedelta(1), date_today + timedelta(4), freq='D'))])})

Результатом применения crossJoin для каждого пользователя должен быть кадр данных с 32 строками. Возможно ли это в pyspark и как это сделать?

1 Ответ

0 голосов
/ 20 апреля 2019

Перекрестное соединение - это объединение, которое генерирует умножение строк, поскольку ключ объединения не идентифицирует строки однозначно (в нашем случае ключ объединения тривиален или вообще не существует ключа объединения)

Начнем с примеров фреймов данных:

import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
    [[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())], 
    schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))

        +----+------+
        |user|value1|
        +----+------+
        |   0|    76|
        |   1|    59|
        |   0|    14|
        |   1|    71|
        |   0|    66|
        |   1|    61|
        |   0|     2|
        |   1|    22|
        |   0|    16|
        |   1|    83|
        +----+------+

        +----+------+
        |user|value2|
        +----+------+
        |   0|    65|
        |   1|    81|
        |   0|    60|
        |   1|    69|
        |   0|    21|
        |   1|    61|
        |   0|    98|
        |   1|    76|
        |   0|    40|
        |   1|    21|
        +----+------+

Давайте попробуем объединить кадры данных в постоянном столбце, чтобы увидеть эквивалентность между перекрестным соединением и регулярным соединением в постоянном (тривиальном) столбце:

df = df1.withColumn('key', psf.lit(1)) \
    .join(df2.withColumn('key', psf.lit(1)), on=['key'])

Мы получаем ошибку от spark> 2, потому что она понимает, что мы пытаемся выполнить перекрестное соединение (декартово произведение)

Py4JJavaError: Произошла ошибка при вызове o1865.showString. : org.apache.spark.sql.AnalysisException: обнаружен неявный декартовой продукт для INNER соединения между логическими планами LogicalRDD [пользователь # 1538, значение1 # 1539], ложь а также LogicalRDD [user # 1542, value2 # 1543], false Условие соединения отсутствует или тривиально. Либо: используйте синтаксис CROSS JOIN, чтобы разрешить декартовы произведения между этими отношения, или: включить неявные декартовы произведения, установив конфигурацию переменная spark.sql.crossJoin.enabled = true;

Если ваш присоединяемый ключ (user здесь) не является столбцом, который уникально идентифицирует строки, вы также получите умножение строк, но внутри каждой группы user:

df = df1.join(df2, on='user')
print("Number of rows : \tdf1: {} \tdf2: {} \tdf: {}".format(df1.count(), df2.count(), df.count()))

        Number of rows :    df1: 10     df2: 10     df: 50

        +----+------+------+
        |user|value1|value2|
        +----+------+------+
        |   1|    59|    81|
        |   1|    59|    69|
        |   1|    59|    61|
        |   1|    59|    76|
        |   1|    59|    21|
        |   1|    71|    81|
        |   1|    71|    69|
        |   1|    71|    61|
        |   1|    71|    76|
        |   1|    71|    21|
        |   1|    61|    81|
        |   1|    61|    69|
        |   1|    61|    61|
        |   1|    61|    76|
        |   1|    61|    21|
        |   1|    22|    81|
        |   1|    22|    69|
        |   1|    22|    61|
        |   1|    22|    76|
        |   1|    22|    21|
        +----+------+------+

5 * 5 строк для пользователя 0 + 5 * 5 строк для пользователя 1, следовательно, 50

Примечание: Использование self join с последующим filter обычно означает, что вы должны вместо этого использовать оконные функции .

...