Перекрестное соединение - это объединение, которое генерирует умножение строк, поскольку ключ объединения не идентифицирует строки однозначно (в нашем случае ключ объединения тривиален или вообще не существует ключа объединения)
Начнем с примеров фреймов данных:
import pyspark.sql.functions as psf
import pyspark.sql.types as pst
df1 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value1']]))
df2 = spark.createDataFrame(
[[user, value] for user, value in zip(5 * list(range(2)), np.random.randint(0, 100, 10).tolist())],
schema=pst.StructType([pst.StructField(c, pst.IntegerType()) for c in ['user', 'value2']]))
+----+------+
|user|value1|
+----+------+
| 0| 76|
| 1| 59|
| 0| 14|
| 1| 71|
| 0| 66|
| 1| 61|
| 0| 2|
| 1| 22|
| 0| 16|
| 1| 83|
+----+------+
+----+------+
|user|value2|
+----+------+
| 0| 65|
| 1| 81|
| 0| 60|
| 1| 69|
| 0| 21|
| 1| 61|
| 0| 98|
| 1| 76|
| 0| 40|
| 1| 21|
+----+------+
Давайте попробуем объединить кадры данных в постоянном столбце, чтобы увидеть эквивалентность между перекрестным соединением и регулярным соединением в постоянном (тривиальном) столбце:
df = df1.withColumn('key', psf.lit(1)) \
.join(df2.withColumn('key', psf.lit(1)), on=['key'])
Мы получаем ошибку от spark> 2, потому что она понимает, что мы пытаемся выполнить перекрестное соединение (декартово произведение)
Py4JJavaError: Произошла ошибка при вызове o1865.showString.
: org.apache.spark.sql.AnalysisException: обнаружен неявный декартовой продукт для INNER соединения между логическими планами
LogicalRDD [пользователь # 1538, значение1 # 1539], ложь
а также
LogicalRDD [user # 1542, value2 # 1543], false
Условие соединения отсутствует или тривиально.
Либо: используйте синтаксис CROSS JOIN, чтобы разрешить декартовы произведения между этими
отношения, или: включить неявные декартовы произведения, установив конфигурацию
переменная spark.sql.crossJoin.enabled = true;
Если ваш присоединяемый ключ (user
здесь) не является столбцом, который уникально идентифицирует строки, вы также получите умножение строк, но внутри каждой группы user
:
df = df1.join(df2, on='user')
print("Number of rows : \tdf1: {} \tdf2: {} \tdf: {}".format(df1.count(), df2.count(), df.count()))
Number of rows : df1: 10 df2: 10 df: 50
+----+------+------+
|user|value1|value2|
+----+------+------+
| 1| 59| 81|
| 1| 59| 69|
| 1| 59| 61|
| 1| 59| 76|
| 1| 59| 21|
| 1| 71| 81|
| 1| 71| 69|
| 1| 71| 61|
| 1| 71| 76|
| 1| 71| 21|
| 1| 61| 81|
| 1| 61| 69|
| 1| 61| 61|
| 1| 61| 76|
| 1| 61| 21|
| 1| 22| 81|
| 1| 22| 69|
| 1| 22| 61|
| 1| 22| 76|
| 1| 22| 21|
+----+------+------+
5 * 5 строк для пользователя 0
+ 5 * 5 строк для пользователя 1
, следовательно, 50
Примечание: Использование self join
с последующим filter
обычно означает, что вы должны вместо этого использовать оконные функции .