Я реализовал внутреннее соединение, используя Java API Spark. Ожидается, что из двух наборов данных один будет иметь большие объемы данных (около 1 000 000 строк), а другой - около 5000 строк.
DS1
+----+----+----+----+-------+
| C1 | C2 | C3 | C4 | C5 |
+----+----+----+----+-------+
| S1 | 1 | 2 | 6 | 3 |
| S2 | 1 | 3 | 7 | 2 |
| S3 | 1 | 4 | 8 | 11 |
| S4 | 1 | 5 | 9 | null |
+----+----+----+----+-------+
DS2
+----+-----+
| EK | EV |
+----+-----+
| C1 | S2 |
| C4 | 7 |
| C3 | 2 |
| C5 | 2 |
+----+-----+
Expected output
+----+----+----+----+-------+----+----+
| C1 | C2 | C3 | C4 | C5 | EK | EK |
+----+----+----+----+-------+----+----+
| S1 | 1 | 2 | 6 | 3 | C3 | 2 |
| S2 | 1 | 3 | 7 | 2 | C1 | S2 |
| S2 | 1 | 3 | 7 | 2 | C4 | 7 |
| S2 | 1 | 3 | 7 | 2 | C5 | 2 |
| S3 | 1 | 4 | 8 | 11 |null|null|
| S4 | 1 | 5 | 9 | null |null|null|
+----+----+----+----+-------+----+----+
Я пытался покинуть соединение, но это было слишком медленно. Затем мы попробовали альтернативную реализацию с внутренним соединением, но даже внутреннее соединение очень медленное.
левое соединение
Dataset<Row> joinedDS = inputJoinedWithHierarchies.join(broadcast(substitutionExclusionDS),
(column("EK").equalTo(lit("C1")).and(column("EV").equalTo(column("C1"))))
.or((column("EK").equalTo(lit("C2")).and(column("EV").equalTo(column("C2")))))
.or((column("EK").equalTo(lit("C3")).and(column("EV").equalTo(column("C3")))))
.or((column("EK").equalTo(lit("C4")).and(column("EV").equalTo(column("C4")))))
.or((column("EK").equalTo(lit("C5")).and(column("EV").equalTo(column("C5"))))),
"left_outer"
)
.drop(<some_columns>)
внутренний присоединиться
Dataset<Row> joinedDS = inputJoinedWithHierarchies.join(broadcast(substitutionExclusionDS),
(column("EK").equalTo(lit("C1")).and(column("EV").equalTo(column("C1"))))
.or((column("EK").equalTo(lit("C2")).and(column("EV").equalTo(column("C2")))))
.or((column("EK").equalTo(lit("C3")).and(column("EV").equalTo(column("C3")))))
.or((column("EK").equalTo(lit("C4")).and(column("EV").equalTo(column("C4")))))
.or((column("EK").equalTo(lit("C5")).and(column("EV").equalTo(column("C5"))))),
"left_outer"
)
.drop(<some_columns>)
Даже после нескольких альтернативных подходов для замены / улучшения left-outer
объединения, включая широковещательное соединение, все кажется очень медленным, и никакие причины / объяснения не могут быть установлены.
Любая помощь приветствуется. Спасибо!