Вам нужно пересечь, войдите в тот же DS. После вы можете написать предложение where, которое возвращает только строку с другим номером между двумя столбцами и только строку, в которой ANumber меньше BNumber.
Это пример:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.{Encoders, Row, SparkSession}
import org.scalatest.FunSuite
class Test extends FunSuite {
test("Test spark cross join") {
val spark = SparkSession.builder().master("local").getOrCreate()
import spark.implicits._
val rows = Seq(Row(1),Row(2),Row(3))
val schema = StructType(Seq(StructField("Number",DataTypes.IntegerType)))
val ds = spark.createDataset(rows)(RowEncoder(schema))
val crossJoinDs = ds.select($"Number".as("ANumber"))
.crossJoin(ds.select($"Number".as("BNumber")))
.where($"ANumber" =!= $"BNumber" && $"ANumber" < $"BNumber")
.map(r => String.valueOf(r(0))+","+String.valueOf(r(1)))(Encoders.STRING)
crossJoinDs.show()
}
, которые печатают следующий вывод:
+-----+
|value|
+-----+
| 1,2|
| 1,3|
| 2,3|
+-----+
Когда вы пишете сбор и перебираете результат, вы отправляете все данные в drivernode. В основном вы останавливаете распределенное вычисление произведений.