Почему Spark Crossjoin работает правильно только при кэшировании? - PullRequest
3 голосов
/ 24 мая 2019

Я устранял неполадки в моем приложении Spark (2.3) и понял, что получаю неправильные результаты, если я не кэшировал набор данных после CrossJoin. Знаем ли мы, требуется ли кэшировать набор данных после crossJoin?

Вот как выглядит мой набор данных:

+--------+------------+
|      id| nameGroupId|
+--------+------------+
| joe san|  6302870528|
|john san|936302870528|
+--------+------------+

Когда я выполняю приведенный ниже код с оператором .cache () без комментариев , callUDF ("leftNameTokenComplement") UDF вызывается со следующими параметрами, что я и хотел бы ожидать:

df1.id = "Джо Сан" & df2.id = "Джон Сан"

Но когда я выполняю тот же блок кода с оператором .cache () с комментариями , callUDF ("leftNameTokenComplement") UDF вызывается со следующими параметрами. Это безумие, потому что фильтр () прямо перед UDF остановил бы это.

df1.id = "Джо Сан" & df2.id = "Джо Сан"

Dataset<Row> connectedDf = sparkSession.read().json(this.workspaceDir + "connected-df");

connectedDf.as("df1")
    .crossJoin(connectedDf.as("df2"))
    //.cache() //correct results only when cached!
    .filter( "df1.id < df2.id" )
    .withColumn("leftComp", functions.callUDF("leftNameTokenComplement", col("df1.id"), col("df2.id") ))
    .filter( functions.size(col("leftComp")).equalTo(1) )       
    .withColumn("src1", functions.callUDF("firstInArray", col("leftComp")) )
    .withColumn("matchedPhonic", functions.callUDF("isNameTokenPhonicMatch", col("src1")) )
    .filter( col("matchedPhonic").equalTo(true))
    .show();
...