Поскольку у меня нет кода для генерации joinDF, и, следовательно, я сам подготовил Dataframe для генерации joinDF. Я протестировал его в ScalaTest, и он работает нормально.
Пожалуйста, обновите ваш код, как показано ниже.
val df = Seq(("2018-01-01", "2018-03-01", 2,"contact","open","me1@example.com"),
("2018-02-01","2018-03-01",1, "contact","open","me1@example.com"),
("2018-01-01","2018-03-01",1, "contact","open","you@example.com"),
("2018-02-01","2018-03-01",1, "contact","open","me1@example.com"),
("2018-01-01","2018-02-01",1, "contact","open","me1@example.com"),
("2018-01-01","2018-02-01", 1, "contact","open","you@example.com")).
toDF("from", "to", "pressroom_id","category","event","email")
df.show()
+----------+----------+------------+--------+-----+---------------+
| from| to|pressroom_id|category|event| email|
+----------+----------+------------+--------+-----+---------------+
|2018-01-01|2018-03-01| 2| contact| open|me1@example.com|
|2018-02-01|2018-03-01| 1| contact| open|me1@example.com|
|2018-01-01|2018-03-01| 1| contact| open|you@example.com|
|2018-02-01|2018-03-01| 1| contact| open|me1@example.com|
|2018-01-01|2018-02-01| 1| contact| open|me1@example.com|
|2018-01-01|2018-02-01| 1| contact| open|you@example.com|
+----------+----------+------------+--------+-----+---------------+
val df1 = df.where(col("category") === lit("contact") && col("event") === lit("open"))
.groupBy("pressroom_id", "from", "to")
.agg(count("email").cast("integer").as("something"))
df1.show()
+------------+----------+----------+---------+
|pressroom_id| from| to|something|
+------------+----------+----------+---------+
| 2|2018-01-01|2018-03-01| 1|
| 1|2018-01-01|2018-03-01| 1|
| 1|2018-02-01|2018-03-01| 2|
| 1|2018-01-01|2018-02-01| 2|
+------------+----------+----------+---------+
Я добавил оператор импорта в свой код.
import org.apache.spark.sql.functions._
Надеюсь, это поможет!