Я использую Spark 2.3.0 и у меня есть два фрейма данных.
Первый, df1, имеет схему:
root
|-- time: long (nullable = true)
|-- channel: string (nullable = false)
Второй, df2, имеет схему:
root
|-- pprChannel: string (nullable = true)
|-- ppr: integer (nullable = false)
Я сейчас пытаюсь сделать:
spark.sql("select a.channel as channel, a.time as time, b.ppr as ppr from df1 a inner join df2 b on a.channel = b.pprChannel")
Но я получаю Detected cartesian product for INNER join between logical plans
.
Когда я пытаюсь воссоздать как на Spark-Shell с sc.parallelize
, так и на простых Seqs, это работает.
Что здесь может быть не так?
Followup
Вот что я получаю, когда использую df1.join(df2, 'channel === 'pprChannel, "inner").explain(true)
:
== Parsed Logical Plan ==
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
: +- Project [time#113L, unnamed AS channel#124]
: +- Project [time#113L]
: +- Project [channel#23, time#113L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
: +- Filter (clipDT#105L >= cast(50000000 as bigint))
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
: +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
: +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, pt1#82]
: +- Project [channel#23, t1#29L, pt1#82]
: +- Filter pt1#82 IN (1,2)
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
: +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
: +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, dv1#58]
: +- Project [channel#23, t1#29L, dv1#58]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
: +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
: +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
: +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
: +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
: +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
: +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
: +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
: +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
: +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
+- ExternalRDD [obj#132]
== Analyzed Logical Plan ==
time: bigint, channel: string, pprChannel: string, ppr: int
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
: +- Project [time#113L, unnamed AS channel#124]
: +- Project [time#113L]
: +- Project [channel#23, time#113L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
: +- Filter (clipDT#105L >= cast(50000000 as bigint))
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
: +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
: +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L]
: +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, pt1#82]
: +- Project [channel#23, t1#29L, pt1#82]
: +- Filter pt1#82 IN (1,2)
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
: +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
: +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L]
: +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [channel#23, t1#29L, dv1#58]
: +- Project [channel#23, t1#29L, dv1#58]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
: +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
: +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
: +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
: +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
: +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
: +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
: +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
: +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
: +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
: +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
: +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
+- ExternalRDD [obj#132]
== Optimized Logical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
+- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
+- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
+- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
+- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
+- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
+- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
+- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
+- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
+- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
+- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
+- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
+- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
Да, df1
- результат довольно сложного вычисления, поэтому он такой большой. df2
- это очень маленький DF, который всегда получается из Map
с максимум 50-100 записями, доставленными в Spark с sc.parallelize
. Таким образом, я мог бы использовать crossJoin
и where
в качестве обходного пути. Но я хочу понять, почему Spark считает, что это декартово произведение.
Продолжение 2
Я сейчас использую другой подход. Так как первый DF - это огромный, который является результатом сложного вычисления, а второй всегда происходит из маленькой карты, я изменил свой алгоритм, чтобы использовать обычные операции map
для достижения этого:
val bDF2Data = sc.broadcast(df2Data)
val res =
df1.
as[(Long, String)].
mapPartitions { iter =>
val df2Data = bDF2Data.value
iter.
flatMap {
case (time, channel) =>
df2Data.get(channel).map(ppr => (time, channel, ppr))
}
}.
toDF("time", "channel", "ppr").
// More operations ...