У меня есть потоковый фрейм данных, который в какой-то момент может выглядеть следующим образом:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
Я выполнил groupBy, agg collect_list, чтобы очистить вещи.
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
вывододин ряд для каждого владельца и массив каждого фрукта.Теперь я хотел бы присоединить этот очищенный массив к исходному потоковому фрейму данных, отбрасывая col фруктов и просто имея колонку fruitsA
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
, это, кажется, работает в моей голове, но спарк, похоже, не согласен.
Я получаю
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
Когда я превращаю все в статический фрейм данных, он работает просто отлично.Разве это невозможно в потоковом контексте?