Обнаруженный искрой декартовый продукт, несмотря на то, что условие соединения нетривиально - PullRequest
0 голосов
/ 10 сентября 2018

Я использую Spark 2.3.0 и у меня есть два фрейма данных.

Первый, df1, имеет схему:

root
 |-- time: long (nullable = true)
 |-- channel: string (nullable = false)

Второй, df2, имеет схему:

root
 |-- pprChannel: string (nullable = true)
 |-- ppr: integer (nullable = false)

Я сейчас пытаюсь сделать:

spark.sql("select a.channel as channel, a.time as time, b.ppr as ppr from df1 a inner join df2 b on a.channel = b.pprChannel")

Но я получаю Detected cartesian product for INNER join between logical plans.

Когда я пытаюсь воссоздать как на Spark-Shell с sc.parallelize, так и на простых Seqs, это работает.

Что здесь может быть не так?

Followup

Вот что я получаю, когда использую df1.join(df2, 'channel === 'pprChannel, "inner").explain(true):

== Parsed Logical Plan ==
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
:  +- Project [time#113L, unnamed AS channel#124]
:     +- Project [time#113L]
:        +- Project [channel#23, time#113L]
:           +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
:              +- Filter (clipDT#105L >= cast(50000000 as bigint))
:                 +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
:                    +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
:                       +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
:                          +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
:                             +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
:                                +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                   +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                      +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
:                                         +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                            +- Project [channel#23, t1#29L, pt1#82]
:                                               +- Project [channel#23, t1#29L, pt1#82]
:                                                  +- Filter pt1#82 IN (1,2)
:                                                     +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
:                                                        +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
:                                                           +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
:                                                              +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
:                                                                 +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                    +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                       +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                          +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
:                                                                             +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                +- Project [channel#23, t1#29L, dv1#58]
:                                                                                   +- Project [channel#23, t1#29L, dv1#58]
:                                                                                      +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
:                                                                                         +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
:                                                                                            +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
:                                                                                               +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
:                                                                                                  +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                     +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                        +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                           +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
:                                                                                                              +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                                 +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
:                                                                                                                    +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
:                                                                                                                       +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
:                                                                                                                          +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
:                                                                                                                             +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
:                                                                                                                                +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
:                                                                                                                                   +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
:                                                                                                                                      +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
      +- ExternalRDD [obj#132]

== Analyzed Logical Plan ==
time: bigint, channel: string, pprChannel: string, ppr: int
Join Inner, (channel#124 = pprChannel#136)
:- Project [time#113L AS time#127L, channel#124]
:  +- Project [time#113L, unnamed AS channel#124]
:     +- Project [time#113L]
:        +- Project [channel#23, time#113L]
:           +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, clipDT#105L, if ((isnull(t0#93L) || isnull(t1#29L))) null else if ((isnull(t0#93L) || isnull(t1#29L))) null else UDF(t0#93L, t1#29L) AS time#113L]
:              +- Filter (clipDT#105L >= cast(50000000 as bigint))
:                 +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, (t1#29L - t0#93L) AS clipDT#105L]
:                    +- Filter (((t0#93L >= cast(0 as bigint)) && (pt0#98 = 1)) && (pt1#82 = 2))
:                       +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98]
:                          +- Project [channel#23, t1#29L, pt1#82, t0#93L, pt0#98, pt0#98]
:                             +- Window [lag(pt1#82, 1, 0) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [channel#23], [t1#29L ASC NULLS FIRST]
:                                +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                   +- Project [channel#23, t1#29L, pt1#82, t0#93L]
:                                      +- Project [channel#23, t1#29L, pt1#82, t0#93L, t0#93L]
:                                         +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                            +- Project [channel#23, t1#29L, pt1#82]
:                                               +- Project [channel#23, t1#29L, pt1#82]
:                                                  +- Filter pt1#82 IN (1,2)
:                                                     +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
:                                                        +- Filter ((t0#70L >= cast(0 as bigint)) && NOT isnan(dv0#75))
:                                                           +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75]
:                                                              +- Project [channel#23, t1#29L, dv1#58, t0#70L, dv0#75, dv0#75]
:                                                                 +- Window [lag(dv1#58, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                    +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                       +- Project [channel#23, t1#29L, dv1#58, t0#70L]
:                                                                          +- Project [channel#23, t1#29L, dv1#58, t0#70L, t0#70L]
:                                                                             +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                +- Project [channel#23, t1#29L, dv1#58]
:                                                                                   +- Project [channel#23, t1#29L, dv1#58]
:                                                                                      +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, abs(if ((isnull(v0#49) || isnull(v1#35))) null else if ((isnull(v0#49) || isnull(v1#35))) null else UDF(v0#49, v1#35)) AS dv1#58]
:                                                                                         +- Filter ((t0#42L >= cast(0 as bigint)) && NOT isnan(v0#49))
:                                                                                            +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49]
:                                                                                               +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, v0#49, v0#49]
:                                                                                                  +- Window [lag(v1#35, 1, NaN) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                     +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                        +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L]
:                                                                                                           +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23, t0#42L, t0#42L]
:                                                                                                              +- Window [lag(t1#29L, 1, -1) windowspecdefinition(channel#23, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L], [channel#23], [t1#29L ASC NULLS FIRST]
:                                                                                                                 +- Project [_c0#10, _c1#11, t1#29L, v1#35, channel#23]
:                                                                                                                    +- Filter ((NOT isnull(t1#29L) && NOT isnull(v1#35)) && ((t1#29L >= cast(0 as bigint)) && NOT isnan(v1#35)))
:                                                                                                                       +- Project [_c0#10, _c1#11, t1#29L, value#18 AS v1#35, channel#23]
:                                                                                                                          +- Project [_c0#10, _c1#11, time#14L AS t1#29L, value#18, channel#23]
:                                                                                                                             +- Project [_c0#10, _c1#11, time#14L, value#18, unnamed AS channel#23]
:                                                                                                                                +- Project [_c0#10, _c1#11, time#14L, UDF(_c1#11) AS value#18]
:                                                                                                                                   +- Project [_c0#10, _c1#11, UDF(_c0#10) AS time#14L]
:                                                                                                                                      +- Relation[_c0#10,_c1#11] csv
+- Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true, false) AS _1#133, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#134]
      +- ExternalRDD [obj#132]

== Optimized Logical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
   +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
      +- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
         +- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
            +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
               +- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
                  +- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
                     +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
                        +- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
                           +- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
                              +- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
      +- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
== Physical Plan ==
org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans
Project [UDF(t0#93L, t1#29L) AS time#127L, unnamed AS channel#124]
+- Filter ((isnotnull(pt0#98) && isnotnull(pt1#82)) && ((((t0#93L >= 0) && (pt0#98 = 1)) && (pt1#82 = 2)) && ((t1#29L - t0#93L) >= 50000000)))
   +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#93L, lag(pt1#82, 1, 0) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS pt0#98], [unnamed], [t1#29L ASC NULLS FIRST]
      +- Project [t1#29L, if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) AS pt1#82]
         +- Filter (((t0#70L >= 0) && NOT isnan(dv0#75)) && if ((isnull(dv0#75) || isnull(dv1#58))) null else if ((isnull(dv0#75) || isnull(dv1#58))) null else UDF(dv0#75, dv1#58) IN (1,2))
            +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#70L, lag(dv1#58, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS dv0#75], [unnamed], [t1#29L ASC NULLS FIRST]
               +- Project [t1#29L, abs(UDF(v0#49, v1#35)) AS dv1#58]
                  +- Filter ((t0#42L >= 0) && NOT isnan(v0#49))
                     +- Window [lag(t1#29L, 1, -1) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS t0#42L, lag(v1#35, 1, NaN) windowspecdefinition(unnamed, t1#29L ASC NULLS FIRST, specifiedwindowframe(RowFrame, -1, -1)) AS v0#49], [unnamed], [t1#29L ASC NULLS FIRST]
                        +- Project [UDF(_c0#10) AS t1#29L, UDF(_c1#11) AS v1#35]
                           +- Filter ((UDF(_c0#10) >= 0) && NOT isnan(UDF(_c1#11)))
                              +- Relation[_c0#10,_c1#11] csv
and
Project [_1#133 AS pprChannel#136, _2#134 AS ppr#137]
+- Filter (isnotnull(_1#133) && (unnamed = _1#133))
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, scala.Tuple2, true])._1, true, false) AS _1#133, assertnotnull(input[0, scala.Tuple2, true])._2 AS _2#134]
      +- ExternalRDD [obj#132]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;

Да, df1 - результат довольно сложного вычисления, поэтому он такой большой. df2 - это очень маленький DF, который всегда получается из Map с максимум 50-100 записями, доставленными в Spark с sc.parallelize. Таким образом, я мог бы использовать crossJoin и where в качестве обходного пути. Но я хочу понять, почему Spark считает, что это декартово произведение.

Продолжение 2

Я сейчас использую другой подход. Так как первый DF - это огромный, который является результатом сложного вычисления, а второй всегда происходит из маленькой карты, я изменил свой алгоритм, чтобы использовать обычные операции map для достижения этого:

val bDF2Data = sc.broadcast(df2Data)
val res =
  df1.
    as[(Long, String)].
    mapPartitions { iter =>
      val df2Data = bDF2Data.value
      iter.
        flatMap {
          case (time, channel) =>
            df2Data.get(channel).map(ppr => (time, channel, ppr))
        }
    }.
    toDF("time", "channel", "ppr").
    // More operations ...
Добро пожаловать на сайт PullRequest, где вы можете задавать вопросы и получать ответы от других членов сообщества.
...